[ 
https://issues.apache.org/jira/browse/AIRFLOW-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17045925#comment-17045925
 ] 

ASF subversion and git services commented on AIRFLOW-5354:
----------------------------------------------------------

Commit 7ae05a09eaf4390fff2e824f4e773dd3292f8e2f in airflow's branch 
refs/heads/master from Ash Berlin-Taylor
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=7ae05a0 ]

[AIRFLOW-5354] Reduce scheduler CPU usage from 100% (#7552)

A previous change ended up with the scheduler busily checking if the
DagFDagFileProcessorAgent had collected any dags. This simple change
takes the CPU usage of the scheduler from an entire core, to barely
anything (dropped below 5%).

Time for 10 dag runs of 9 dags with 108 total tasks: 50.3581s (±9.538s)
vs master of Time for 10 dag runs of 9 dags with 108 total tasks: 49.6910s 
(±7.193s)

The change is is basically no overall change, and is a quick fix for
now, and bigger changes are in store around DAG parsing anyway.

> Scheduler - constant CPU usage of 25% with nothing running and scheduling 
> loop running too frequently
> -----------------------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-5354
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5354
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: scheduler
>    Affects Versions: 1.10.4
>            Reporter: t oo
>            Assignee: Ash Berlin-Taylor
>            Priority: Major
>             Fix For: 1.10.10
>
>
> I've put logging level to debug, cpu utilisation is constant 25% but no dags 
> are running (they have none schedule since i only externally trigger). Why is 
> the scheduling loop running every 2 seconds? can I make it every 30 seconds?
>  
> here is some of the scheduler log
>  
> [2019-08-30 09:28:57,538] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:28:57,539] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,540] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:28:57,540] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:28:57,540] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:28:57,540] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:28:57,540] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:28:57,540] \{base_executor.py:135} DEBUG - Calling the <class 
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:28:57,541] \{scheduler_job.py:1432} DEBUG - Ran scheduling 
> loop in 0.00 seconds
> [2019-08-30 09:28:57,541] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
> seconds
> [2019-08-30 09:28:58,543] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
> seconds to prevent excessive logging
> [2019-08-30 09:28:59,541] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:28:59,541] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:28:59,541] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:28:59,542] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:28:59,542] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:28:59,543] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:28:59,543] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:28:59,543] \{base_executor.py:135} DEBUG - Calling the <class 
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:28:59,544] \{scheduler_job.py:1432} DEBUG - Ran scheduling 
> loop in 0.00 seconds
> [2019-08-30 09:28:59,544] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
> seconds
> [2019-08-30 09:29:00,545] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
> seconds to prevent excessive logging
> [2019-08-30 09:29:01,544] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:29:01,545] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,546] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:29:01,546] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:29:01,546] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:29:01,546] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:29:01,547] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:29:01,547] \{base_executor.py:135} DEBUG - Calling the <class 
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:29:01,548] \{scheduler_job.py:1432} DEBUG - Ran scheduling 
> loop in 0.00 seconds
> [2019-08-30 09:29:01,548] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
> seconds
> [2019-08-30 09:29:02,549] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
> seconds to prevent excessive logging
> [2019-08-30 09:29:03,548] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:29:03,548] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:29:03,548] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:03,549] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:29:03,550] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:29:03,550] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:29:03,550] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:29:03,550] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:29:03,550] \{base_executor.py:135} DEBUG - Calling the <class 
> 'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:29:03,551] \{scheduler_job.py:1423} DEBUG - Heartbeating the 
> scheduler
> [2019-08-30 09:29:03,564] \{base_job.py:197} DEBUG - [heartbeat]
> [2019-08-30 09:29:03,565] \{scheduler_job.py:1432} DEBUG - Ran scheduling 
> loop in 0.02 seconds
> [2019-08-30 09:29:03,565] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
> seconds
> [2019-08-30 09:29:04,567] \{scheduler_job.py:1447} DEBUG - Sleeping for 0.98 
> seconds to prevent excessive logging
>  
>  
> ps -ef | grep -i airf
> ec2-user 5648 1 0 09:05 pts/1 00:00:05 /home/ec2-user/venv/bin/python2.7 
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5727 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5734 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5738 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5739 5648 0 09:05 pts/1 00:00:00 /home/ec2-user/venv/bin/python2.7 
> /home/ec2-user/venv/bin/airflow scheduler
> ec2-user 5746 5648 0 09:05 pts/1 00:00:02 airflow scheduler -- 
> DagFileProcessorManager
> ec2-user 6161 1 0 09:05 pts/1 00:00:04 /home/ec2-user/venv/bin/python2.7 
> /home/ec2-user/venv/bin/airflow webserver -p 8080 --access_logfile 
> /home/ec2-user/airflow/logs/airflow-webserver-access.log
> ec2-user 6243 6161 0 09:05 pts/1 00:00:01 gunicorn: master 
> [airflow-webserver] 
> ec2-user 6303 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker 
> [airflow-webserver] 
> ec2-user 6304 6243 0 09:05 pts/1 00:00:06 [ready] gunicorn: worker 
> [airflow-webserver] 
> ec2-user 12539 6243 0 09:16 pts/1 00:00:06 [ready] gunicorn: worker 
> [airflow-webserver] 
> ec2-user 18988 6243 0 09:26 pts/1 00:00:04 [ready] gunicorn: worker 
> [airflow-webserver]
>  
> airflow.cfg
> [core]
> dags_folder = /home/ec2-user/airflow/dags
> base_log_folder = /home/ec2-user/airflow/logs
> remote_logging = False
> remote_log_conn_id =
> remote_base_log_folder =
> encrypt_s3_logs = False
> logging_level = DEBUG
> fab_logging_level = DEBUG
> logging_config_class =
> colored_console_log = True
> colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] 
> \{%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} 
> %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s
> colored_formatter_class = 
> airflow.utils.log.colored_log.CustomTTYColoredFormatter
> log_format = [%%(asctime)s] \{%%(filename)s:%%(lineno)d} %%(levelname)s - 
> %%(message)s
> simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
> log_filename_template = \{{ ti.dag_id }}/\{{ ti.task_id }}/\{{ ts }}/\{{ 
> try_number }}.log
> log_processor_filename_template = \{{ filename }}.log
> dag_processor_manager_log_location = 
> /home/ec2-user/airflow/logs/dag_processor_manager/dag_processor_manager.log
> hostname_callable = socket:getfqdn
> default_timezone = utc
> executor = LocalExecutor
> sql_alchemy_conn_cmd = /home/ec2-user/airflow/scripts/get_db_conn_str.sh
> sql_engine_encoding = utf-8
> sql_alchemy_pool_enabled = True
> sql_alchemy_pool_size = 5
> sql_alchemy_max_overflow = 10
> sql_alchemy_pool_recycle = 1800
> sql_alchemy_reconnect_timeout = 300
> sql_alchemy_schema =
> parallelism = 3
> dag_concurrency = 3
> dags_are_paused_at_creation = False
> max_active_runs_per_dag = 350
> load_examples = False
> plugins_folder = /home/ec2-user/airflow/plugins
> fernet_key = REDACT
> donot_pickle = False
> dagbag_import_timeout = 300
> task_runner = StandardTaskRunner
> default_impersonation =
> security =
> secure_mode = True
> unit_test_mode = False
> task_log_reader = task
> enable_xcom_pickling = True
> killed_task_cleanup_time = 60
> dag_run_conf_overrides_params = False
> worker_precheck = False
> dag_discovery_safe_mode = False
> [cli]
> api_client = airflow.api.client.local_client
> endpoint_url = http://localhost:8080
> [api]
> auth_backend = airflow.api.auth.backend.deny_all
> [lineage]
> backend =
> [atlas]
> sasl_enabled = False
> host =
> port = 21000
> username =
> password =
> [operators]
> default_owner = airflow
> default_cpus = 1
> default_ram = 512
> default_disk = 512
> default_gpus = 0
> [hive]
> default_hive_mapred_queue =
> [webserver]
> base_url = https://REDACT:8080
> web_server_host = 0.0.0.0
> web_server_port = 8080
> web_server_ssl_cert = /home/ec2-user/certs/REDACT
> web_server_ssl_key = /home/ec2-user/certs/REDACT
> web_server_master_timeout = 120
> web_server_worker_timeout = 120
> worker_refresh_batch_size = 1
> worker_refresh_interval = 600
> secret_key = temporary_key
> workers = 4
> worker_class = gevent
> access_logfile = -
> error_logfile = -
> expose_config = False
> authenticate = True
> auth_backend = airflow.contrib.auth.backends.ldap_auth
> filter_by_owner = False
> owner_mode = user
> dag_default_view = tree
> dag_orientation = LR
> demo_mode = False
> log_fetch_timeout_sec = 5
> hide_paused_dags_by_default = False
> page_size = 100
> rbac = False
> navbar_color = #007A87
> default_dag_run_display_number = 25
> enable_proxy_fix = False
> cookie_secure = False
> cookie_samesite =
> default_wrap = False
> [email]
> email_backend = airflow.utils.email.send_email_smtp
> [smtp]
> smtp_host = localhost
> smtp_starttls = True
> smtp_ssl = False
> smtp_port = 25
> smtp_mail_from = [email protected]
> [celery]
> celery_app_name = airflow.executors.celery_executor
> worker_concurrency = 16
> worker_log_server_port = 8793
> broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
> result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
> flower_host = 0.0.0.0
> flower_url_prefix =
> flower_port = 5555
> flower_basic_auth =
> default_queue = default
> sync_parallelism = 0
> celery_config_options = 
> airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
> ssl_active = False
> ssl_key =
> ssl_cert =
> ssl_cacert =
> pool = prefork
> [celery_broker_transport_options]
> [dask]
> cluster_address = 127.0.0.1:8786
> tls_ca =
> tls_cert =
> tls_key =
> [scheduler]
> job_heartbeat_sec = 5
> scheduler_heartbeat_sec = 25
> run_duration = -1
> min_file_process_interval = 90
> dag_dir_list_interval = 300
> print_stats_interval = 30
> scheduler_health_check_threshold = 30
> child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
> scheduler_zombie_task_threshold = 300
> catchup_by_default = True
> max_tis_per_query = 2000
> statsd_on = False
> statsd_host = localhost
> statsd_port = 8125
> statsd_prefix = airflow
> max_threads = 1
> authenticate = False
> use_job_schedule = False
> [ldap]
> REDACT ldap stuff
> search_scope = SUBTREE
> ignore_malformed_schema = False
> [mesos]
> master = localhost:5050
> framework_name = Airflow
> task_cpu = 1
> task_memory = 256
> checkpoint = False
> authenticate = False
> [kerberos]
> ccache = /tmp/airflow_krb5_ccache
> principal = airflow
> reinit_frequency = 3600
> kinit_path = kinit
> keytab = airflow.keytab
> [github_enterprise]
> api_rev = v3
> [admin]
> hide_sensitive_variable_fields = True
> [elasticsearch]
> host =
> log_id_template = \{dag_id}-\{task_id}-\{execution_date}-\{try_number}
> end_of_log_mark = end_of_log
> frontend =
> write_stdout = False
> json_format = False
> json_fields = asctime, filename, lineno, levelname, message
> [kubernetes]
> worker_container_repository =
> worker_container_tag =
> worker_container_image_pull_policy = IfNotPresent
> delete_worker_pods = True
> worker_pods_creation_batch_size = 1
> namespace = default
> airflow_configmap =
> dags_in_image = False
> dags_volume_subpath =
> dags_volume_claim =
> logs_volume_subpath =
> logs_volume_claim =
> dags_volume_host =
> logs_volume_host =
> env_from_configmap_ref =
> env_from_secret_ref =
> git_repo =
> git_branch =
> git_subpath =
> git_user =
> git_password =
> git_sync_root = /git
> git_sync_dest = repo
> git_dags_folder_mount_point =
> git_ssh_key_secret_name =
> git_ssh_known_hosts_configmap_name =
> git_sync_container_repository = k8s.gcr.io/git-sync
> git_sync_container_tag = v3.1.1
> git_sync_init_container_name = git-sync-clone
> worker_service_account_name =
> image_pull_secrets =
> gcp_service_account_keys =
> in_cluster = True
> affinity =
> tolerations =
> kube_client_request_args =
> run_as_user =
> fs_group =
> [kubernetes_node_selectors]
> [kubernetes_annotations]
> [kubernetes_environment_variables]
> [kubernetes_secrets]
> [kubernetes_labels]
>  
> airflow 1.10.4, localexecutor, python2, t3.large EC2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to