potiuk commented on code in PR #41961:
URL: https://github.com/apache/airflow/pull/41961#discussion_r1741532812
##########
performance/src/performance_dags/elastic_dag/elastic_dag.py:
##########
@@ -0,0 +1,322 @@
+"""
+Elastic dag copied from apache/airflow master and adjusted to work on python2
and with Airflow
+versions prior to 2.0
+"""
+
+import enum
+import json
+import os
+import re
+import time
+from datetime import datetime, timedelta
+from enum import Enum
+from typing import List, Union, cast
+
+from airflow import DAG
+from airflow.exceptions import AirflowException
+from airflow.models import BaseOperator
+from airflow.operators.bash import BashOperator
+from airflow.operators.python import PythonOperator
+from airflow.utils.trigger_rule import TriggerRule
+
+
+# DAG File used in performance tests. Its shape can be configured by
environment variables.
+RE_TIME_DELTA = re.compile(
+
r"^((?P<days>[\.\d]+?)d)?((?P<hours>[\.\d]+?)h)?((?P<minutes>[\.\d]+?)m)?((?P<seconds>[\.\d]+?)s)?$"
+)
+
+
+def parse_time_delta(time_str):
+ # type: (str) -> datetime.timedelta
+ """
+ Parse a time string e.g. (2h13m) into a timedelta object.
+
+ :param time_str: A string identifying a duration. (eg. 2h13m)
+ :return datetime.timedelta: A datetime.timedelta object or "@once"
+ """
+ parts = RE_TIME_DELTA.match(time_str)
+
+ assert parts is not None, (
+ "Could not parse any time information from '{time_str}'. "
+ "Examples of valid strings: '8h', '2d8h5m20s',
'2m4s'".format(time_str=time_str)
+ )
+
+ time_params = {name: float(param) for name, param in
parts.groupdict().items() if param}
+ return timedelta(**time_params) # type: ignore
+
+
+def parse_start_date(date, start_ago):
+ """
+ Returns the start date for the elastic DAGs and string to be used as part
of their ids.
+ :return Tuple[datetime.datetime, str]: A tuple of datetime.datetime object
to be used
+ as a start_date and a string that should be used as part of the dag_id.
+ """
+
+ if date:
+ start_date = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f")
+ dag_id_component = int(start_date.timestamp())
+ else:
+ start_date = datetime.now() - parse_time_delta(start_ago)
+ dag_id_component = start_ago
+ return start_date, dag_id_component
+
+
+def parse_schedule_interval(time_str):
+ # type: (str) -> datetime.timedelta
+ """
+ Parse a schedule interval string e.g. (2h13m) or "@once".
+
+ :param time_str: A string identifying a schedule interval. (eg. 2h13m,
None, @once)
+ :return datetime.timedelta: A datetime.timedelta object or "@once" or None
+ """
+ if time_str == "None":
+ return None
+
+ if time_str == "@once":
+ return "@once"
+
+ return parse_time_delta(time_str)
+
+
+def safe_dag_id(dag_id):
+ # type: (str) -> str
+ """
+ Remove invalid characters for dag_id
+ """
+ return re.sub("[^0-9a-zA-Z_]+", "_", dag_id)
+
+
+def get_task_list(dag_object, operator_type_str, task_count, trigger_rule,
sleep_time, operator_extra_kwargs):
+ # type: (DAG, str, int, float, int, dict) -> List[BaseOperator]
+ """
+ Return list of tasks of test dag
+
+ :param dag_object: A DAG object the tasks should be assigned to.
+ :param operator_type_str: A string identifying the type of operator
+ :param task_count: An integer specifying the number of tasks to create.
+ :param trigger_rule: A string identifying the rule by which dependencies
are applied
+ for the tasks to get triggered
+ :param sleep_time: A non-negative float value specifying the time of sleep
occurring
+ when each task is executed
+ :param operator_extra_kwargs: A dictionary with extra kwargs for operator
+ :return List[BaseOperator]: a list of tasks
+ """
+ if operator_type_str == "bash":
+ task_list = [
+ BashOperator(
+ task_id="__".join(["tasks", "{i}_of_{task_count}".format(i=i,
task_count=task_count)]),
+ bash_command="sleep {sleep_time}; echo
test".format(sleep_time=sleep_time),
+ dag=dag_object,
+ trigger_rule=trigger_rule,
+ **operator_extra_kwargs,
+ )
+ for i in range(1, task_count + 1)
+ ]
+ elif operator_type_str == "python":
+
+ def sleep_function():
+ time.sleep(sleep_time)
+ print("test")
+
+ task_list = [
+ PythonOperator(
+ task_id="__".join(["tasks", "{i}_of_{task_count}".format(i=i,
task_count=task_count)]),
+ python_callable=sleep_function,
+ dag=dag_object,
+ trigger_rule=trigger_rule,
+ **operator_extra_kwargs,
+ )
+ for i in range(1, task_count + 1)
+ ]
+ else:
+ raise ValueError(f"Unsupported operator type: {operator_type_str}.")
+ return task_list
+
+
+def chain(*tasks):
Review Comment:
I think that was used for Airflow 1->2 migration when we did not have
`chain` but I think chain is now defined in BaseOperator, we do not need to
copy it here I **think**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]