feng-tao closed pull request #3653: [AIRFLOW-2811] Fix scheduler_ops_metrics.py to work URL: https://github.com/apache/incubator-airflow/pull/3653
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/scripts/perf/scheduler_ops_metrics.py b/scripts/perf/scheduler_ops_metrics.py index d4e472d34f..7928649977 100644 --- a/scripts/perf/scheduler_ops_metrics.py +++ b/scripts/perf/scheduler_ops_metrics.py @@ -17,7 +17,6 @@ # specific language governing permissions and limitations # under the License. -from datetime import datetime import logging import pandas as pd import sys @@ -25,6 +24,7 @@ from airflow import configuration, settings from airflow.jobs import SchedulerJob from airflow.models import DagBag, DagModel, DagRun, TaskInstance +from airflow.utils import timezone from airflow.utils.state import State SUBDIR = 'scripts/perf/dags' @@ -53,7 +53,10 @@ class SchedulerMetricsJob(SchedulerJob): run on remote systems and spend the majority of their time on I/O wait. To Run: - $ python scripts/perf/scheduler_ops_metrics.py + $ python scripts/perf/scheduler_ops_metrics.py [timeout] + + You can specify timeout in seconds as an optional parameter. + Its default value is 6 seconds. """ __mapper_args__ = { 'polymorphic_identity': 'SchedulerMetricsJob' @@ -71,7 +74,7 @@ def print_stats(self): .filter(TI.dag_id.in_(DAG_IDS)) .all() ) - successful_tis = filter(lambda x: x.state == State.SUCCESS, tis) + successful_tis = [x for x in tis if x.state == State.SUCCESS] ti_perf = [(ti.dag_id, ti.task_id, ti.execution_date, (ti.queued_dttm - self.start_date).total_seconds(), (ti.start_date - self.start_date).total_seconds(), @@ -117,11 +120,11 @@ def heartbeat(self): dagbag = DagBag(SUBDIR) dags = [dagbag.dags[dag_id] for dag_id in DAG_IDS] # the tasks in perf_dag_1 and per_dag_2 have a daily schedule interval. - num_task_instances = sum([(datetime.today() - task.start_date).days + num_task_instances = sum([(timezone.utcnow() - task.start_date).days for dag in dags for task in dag.tasks]) if (len(successful_tis) == num_task_instances or - (datetime.now()-self.start_date).total_seconds() > + (timezone.utcnow() - self.start_date).total_seconds() > MAX_RUNTIME_SECS): if (len(successful_tis) == num_task_instances): self.log.info("All tasks processed! Printing stats.") @@ -178,6 +181,17 @@ def set_dags_paused_state(is_paused): def main(): + global MAX_RUNTIME_SECS + if len(sys.argv) > 1: + try: + max_runtime_secs = int(sys.argv[1]) + if max_runtime_secs < 1: + raise ValueError + MAX_RUNTIME_SECS = max_runtime_secs + except ValueError: + logging.error('Specify a positive integer for timeout.') + sys.exit(1) + configuration.load_test_config() set_dags_paused_state(False) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services