[
https://issues.apache.org/jira/browse/AIRFLOW-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045768#comment-17045768
]
ASF GitHub Bot commented on AIRFLOW-5354:
-----------------------------------------
ashb commented on pull request #7552: [AIRFLOW-5354] Reduce scheduler CPU usage
from 100%
URL: https://github.com/apache/airflow/pull/7552
A previous change ended up with the scheduler busily checking if the
DagFDagFileProcessorAgent had collected any dags. This simple change
takes the CPU usage of the scheduler from an entire core, to barely
anything (dropped below 5%).
Time for 10 dag runs of 9 dags with 108 total tasks: 50.3581s (±9.538s)
vs master of Time for 10 dag runs of 9 dags with 108 total tasks: 49.6910s
(±7.193s)
The change is is basically no overall change, and is a quick fix for
now, and bigger changes are in store around DAG parsing anyway.
---
Issue link: WILL BE INSERTED BY
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
Make sure to mark the boxes below before creating PR: [x]
- [x] Description above provides context of the change
- [x] Commit message/PR title starts with `[AIRFLOW-NNNN]`. AIRFLOW-NNNN =
JIRA ID<sup>*</sup>
- [x] Unit tests coverage for changes (not needed for documentation changes)
- [x] Commits follow "[How to write a good git commit
message](http://chris.beams.io/posts/git-commit/)"
- [x] Relevant documentation is updated including usage instructions.
- [x] I will engage committers as explained in [Contribution Workflow
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
<sup>*</sup> For document-only changes commit message can start with
`[AIRFLOW-XXXX]`.
---
In case of fundamental code change, Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
Read the [Pull Request
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
for more information.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Scheduler - constant CPU usage of 25% with nothing running and scheduling
> loop running too frequently
> -----------------------------------------------------------------------------------------------------
>
> Key: AIRFLOW-5354
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5354
> Project: Apache Airflow
> Issue Type: Improvement
> Components: scheduler
> Affects Versions: 1.10.4
> Reporter: t oo
> Assignee: Ash Berlin-Taylor
> Priority: Major
>
> I've put logging level to debug, cpu utilisation is constant 25% but no dags
> are running (they have none schedule since i only externally trigger). Why is
> the scheduling loop running every 2 seconds? can I make it every 30 seconds?
>
> here is some of the scheduler log
>
> [2019-08-30 09:28:57,538] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:28:57,539] \{scheduler_job.py:1374} DEBUG - Harvesting DAG
> parsing results
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:57,540] \{scheduler_job.py:1376} DEBUG - Harvested 0
> SimpleDAGs
> [2019-08-30 09:28:57,540] \{scheduler_job.py:1411} DEBUG - Heartbeating the
> executor
> [2019-08-30 09:28:57,540] \{base_executor.py:124} DEBUG - 0 running task
> instances
> [2019-08-30 09:28:57,540] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:28:57,540] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:28:57,540] \{base_executor.py:135} DEBUG - Calling the <class
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:28:57,541] \{scheduler_job.py:1432} DEBUG - Ran scheduling
> loop in 0.00 seconds
> [2019-08-30 09:28:57,541] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00
> seconds
> [2019-08-30 09:28:58,543] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00
> seconds to prevent excessive logging
> [2019-08-30 09:28:59,541] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:28:59,541] \{scheduler_job.py:1374} DEBUG - Harvesting DAG
> parsing results
> [2019-08-30 09:28:59,541] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{scheduler_job.py:1376} DEBUG - Harvested 0
> SimpleDAGs
> [2019-08-30 09:28:59,542] \{scheduler_job.py:1411} DEBUG - Heartbeating the
> executor
> [2019-08-30 09:28:59,542] \{base_executor.py:124} DEBUG - 0 running task
> instances
> [2019-08-30 09:28:59,543] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:28:59,543] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:28:59,543] \{base_executor.py:135} DEBUG - Calling the <class
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:28:59,544] \{scheduler_job.py:1432} DEBUG - Ran scheduling
> loop in 0.00 seconds
> [2019-08-30 09:28:59,544] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00
> seconds
> [2019-08-30 09:29:00,545] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00
> seconds to prevent excessive logging
> [2019-08-30 09:29:01,544] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:29:01,545] \{scheduler_job.py:1374} DEBUG - Harvesting DAG
> parsing results
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:01,546] \{scheduler_job.py:1376} DEBUG - Harvested 0
> SimpleDAGs
> [2019-08-30 09:29:01,546] \{scheduler_job.py:1411} DEBUG - Heartbeating the
> executor
> [2019-08-30 09:29:01,546] \{base_executor.py:124} DEBUG - 0 running task
> instances
> [2019-08-30 09:29:01,546] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:29:01,547] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:29:01,547] \{base_executor.py:135} DEBUG - Calling the <class
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:29:01,548] \{scheduler_job.py:1432} DEBUG - Ran scheduling
> loop in 0.00 seconds
> [2019-08-30 09:29:01,548] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00
> seconds
> [2019-08-30 09:29:02,549] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00
> seconds to prevent excessive logging
> [2019-08-30 09:29:03,548] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:29:03,548] \{scheduler_job.py:1374} DEBUG - Harvesting DAG
> parsing results
> [2019-08-30 09:29:03,548] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{scheduler_job.py:1376} DEBUG - Harvested 0
> SimpleDAGs
> [2019-08-30 09:29:03,550] \{scheduler_job.py:1411} DEBUG - Heartbeating the
> executor
> [2019-08-30 09:29:03,550] \{base_executor.py:124} DEBUG - 0 running task
> instances
> [2019-08-30 09:29:03,550] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:29:03,550] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:29:03,550] \{base_executor.py:135} DEBUG - Calling the <class
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:29:03,551] \{scheduler_job.py:1423} DEBUG - Heartbeating the
> scheduler
> [2019-08-30 09:29:03,564] \{base_job.py:197} DEBUG - [heartbeat]
> [2019-08-30 09:29:03,565] \{scheduler_job.py:1432} DEBUG - Ran scheduling
> loop in 0.02 seconds
> [2019-08-30 09:29:03,565] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00
> seconds
> [2019-08-30 09:29:04,567] \{scheduler_job.py:1447} DEBUG - Sleeping for 0.98
> seconds to prevent excessive logging
>
>
> ps -ef | grep -i airf
> ec2-user 5648 1 0 09:05 pts/1 00:00:05 /home/ec2-user/venv/bin/python2.7
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5727 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5734 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5738 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5739 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5746 5648 0 09:05 pts/1 00:00:02 airflow scheduler --
> DagFileProcessorManager
> ec2-user 6161 1 0 09:05 pts/1 00:00:04 /home/ec2-user/venv/bin/python2.7
> /home/ec2-user/venv/bin/airflow webserver -p 8080 --access_logfile
> /home/ec2-user/airflow/logs/airflow-webserver-access.log
> ec2-user 6243 6161 0 09:05 pts/1 00:00:01 gunicorn: master
> [airflow-webserver]
> ec2-user 6303 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker
> [airflow-webserver]
> ec2-user 6304 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker
> [airflow-webserver]
> ec2-user 12539 6243 0 09:16 pts/1 00:00:06 [ready] gunicorn: worker
> [airflow-webserver]
> ec2-user 18988 6243 0 09:26 pts/1 00:00:04 [ready] gunicorn: worker
> [airflow-webserver]
>
> airflow.cfg
> [core]
> dags_folder = /home/ec2-user/airflow/dags
> base_log_folder = /home/ec2-user/airflow/logs
> remote_logging = False
> remote_log_conn_id =
> remote_base_log_folder =
> encrypt_s3_logs = False
> logging_level = DEBUG
> fab_logging_level = DEBUG
> logging_config_class =
> colored_console_log = True
> colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s]
> \{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d}
> %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
> colored_formatter_class =
> airflow.utils.log.colored_log.CustomTTYColoredFormatter
> log_format = [%%(asctime)s] \{%%(filename)s:%%(lineno)d} %%(levelname)s -
> %%(message)s
> simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
> log_filename_template = \{{ ti.dag_id }}/\{{ ti.task_id }}/\{{ ts }}/\{{
> try_number }}.log
> log_processor_filename_template = \{{ filename }}.log
> dag_processor_manager_log_location =
> /home/ec2-user/airflow/logs/dag_processor_manager/dag_processor_manager.log
> hostname_callable = socket:getfqdn
> default_timezone = utc
> executor = LocalExecutor
> sql_alchemy_conn_cmd = /home/ec2-user/airflow/scripts/get_db_conn_str.sh
> sql_engine_encoding = utf-8
> sql_alchemy_pool_enabled = True
> sql_alchemy_pool_size = 5
> sql_alchemy_max_overflow = 10
> sql_alchemy_pool_recycle = 1800
> sql_alchemy_reconnect_timeout = 300
> sql_alchemy_schema =
> parallelism = 3
> dag_concurrency = 3
> dags_are_paused_at_creation = False
> max_active_runs_per_dag = 350
> load_examples = False
> plugins_folder = /home/ec2-user/airflow/plugins
> fernet_key = REDACT
> donot_pickle = False
> dagbag_import_timeout = 300
> task_runner = StandardTaskRunner
> default_impersonation =
> security =
> secure_mode = True
> unit_test_mode = False
> task_log_reader = task
> enable_xcom_pickling = True
> killed_task_cleanup_time = 60
> dag_run_conf_overrides_params = False
> worker_precheck = False
> dag_discovery_safe_mode = False
> [cli]
> api_client = airflow.api.client.local_client
> endpoint_url = http://localhost:8080
> [api]
> auth_backend = airflow.api.auth.backend.deny_all
> [lineage]
> backend =
> [atlas]
> sasl_enabled = False
> host =
> port = 21000
> username =
> password =
> [operators]
> default_owner = airflow
> default_cpus = 1
> default_ram = 512
> default_disk = 512
> default_gpus = 0
> [hive]
> default_hive_mapred_queue =
> [webserver]
> base_url = https://REDACT:8080
> web_server_host = 0.0.0.0
> web_server_port = 8080
> web_server_ssl_cert = /home/ec2-user/certs/REDACT
> web_server_ssl_key = /home/ec2-user/certs/REDACT
> web_server_master_timeout = 120
> web_server_worker_timeout = 120
> worker_refresh_batch_size = 1
> worker_refresh_interval = 600
> secret_key = temporary_key
> workers = 4
> worker_class = gevent
> access_logfile = -
> error_logfile = -
> expose_config = False
> authenticate = True
> auth_backend = airflow.contrib.auth.backends.ldap_auth
> filter_by_owner = False
> owner_mode = user
> dag_default_view = tree
> dag_orientation = LR
> demo_mode = False
> log_fetch_timeout_sec = 5
> hide_paused_dags_by_default = False
> page_size = 100
> rbac = False
> navbar_color = #007A87
> default_dag_run_display_number = 25
> enable_proxy_fix = False
> cookie_secure = False
> cookie_samesite =
> default_wrap = False
> [email]
> email_backend = airflow.utils.email.send_email_smtp
> [smtp]
> smtp_host = localhost
> smtp_starttls = True
> smtp_ssl = False
> smtp_port = 25
> smtp_mail_from = [email protected]
> [celery]
> celery_app_name = airflow.executors.celery_executor
> worker_concurrency = 16
> worker_log_server_port = 8793
> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
> result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
> flower_host = 0.0.0.0
> flower_url_prefix =
> flower_port = 5555
> flower_basic_auth =
> default_queue = default
> sync_parallelism = 0
> celery_config_options =
> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> ssl_active = False
> ssl_key =
> ssl_cert =
> ssl_cacert =
> pool = prefork
> [celery_broker_transport_options]
> [dask]
> cluster_address = 127.0.0.1:8786
> tls_ca =
> tls_cert =
> tls_key =
> [scheduler]
> job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 25
> run_duration = -1
> min_file_process_interval = 90
> dag_dir_list_interval = 300
> print_stats_interval = 30
> scheduler_health_check_threshold = 30
> child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
> scheduler_zombie_task_threshold = 300
> catchup_by_default = True
> max_tis_per_query = 2000
> statsd_on = False
> statsd_host = localhost
> statsd_port = 8125
> statsd_prefix = airflow
> max_threads = 1
> authenticate = False
> use_job_schedule = False
> [ldap]
> REDACT ldap stuff
> search_scope = SUBTREE
> ignore_malformed_schema = False
> [mesos]
> master = localhost:5050
> framework_name = Airflow
> task_cpu = 1
> task_memory = 256
> checkpoint = False
> authenticate = False
> [kerberos]
> ccache = /tmp/airflow_krb5_ccache
> principal = airflow
> reinit_frequency = 3600
> kinit_path = kinit
> keytab = airflow.keytab
> [github_enterprise]
> api_rev = v3
> [admin]
> hide_sensitive_variable_fields = True
> [elasticsearch]
> host =
> log_id_template = \{dag_id}-\{task_id}-\{execution_date}-\{try_number}
> end_of_log_mark = end_of_log
> frontend =
> write_stdout = False
> json_format = False
> json_fields = asctime, filename, lineno, levelname, message
> [kubernetes]
> worker_container_repository =
> worker_container_tag =
> worker_container_image_pull_policy = IfNotPresent
> delete_worker_pods = True
> worker_pods_creation_batch_size = 1
> namespace = default
> airflow_configmap =
> dags_in_image = False
> dags_volume_subpath =
> dags_volume_claim =
> logs_volume_subpath =
> logs_volume_claim =
> dags_volume_host =
> logs_volume_host =
> env_from_configmap_ref =
> env_from_secret_ref =
> git_repo =
> git_branch =
> git_subpath =
> git_user =
> git_password =
> git_sync_root = /git
> git_sync_dest = repo
> git_dags_folder_mount_point =
> git_ssh_key_secret_name =
> git_ssh_known_hosts_configmap_name =
> git_sync_container_repository = k8s.gcr.io/git-sync
> git_sync_container_tag = v3.1.1
> git_sync_init_container_name = git-sync-clone
> worker_service_account_name =
> image_pull_secrets =
> gcp_service_account_keys =
> in_cluster = True
> affinity =
> tolerations =
> kube_client_request_args =
> run_as_user =
> fs_group =
> [kubernetes_node_selectors]
> [kubernetes_annotations]
> [kubernetes_environment_variables]
> [kubernetes_secrets]
> [kubernetes_labels]
>
> airflow 1.10.4, localexecutor, python2, t3.large EC2
--
This message was sent by Atlassian Jira
(v8.3.4#803005)