This is an automated email from the ASF dual-hosted git repository. kamilbregula pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new 904709d Check synctatic correctness for code-snippets (#16005) 904709d is described below commit 904709d34fbe0b6062d72932b72954afe13ec148 Author: Kamil Breguła <mik-...@users.noreply.github.com> AuthorDate: Thu May 27 07:09:25 2021 +0200 Check synctatic correctness for code-snippets (#16005) * Check syntactic correctness for code-snippets * fixup! Check syntactic correctness for code-snippets * fixup! fixup! Check syntactic correctness for code-snippets * fixup! fixup! fixup! Check syntactic correctness for code-snippets --- .pre-commit-config.yaml | 6 + BREEZE.rst | 10 +- CONTRIBUTING.rst | 13 +- CONTRIBUTORS_QUICK_START.rst | 9 +- STATIC_CODE_CHECKS.rst | 17 +- TESTING.rst | 22 +- UPDATING.md | 165 ++++++++----- airflow/models/dag.py | 7 +- airflow/operators/bash.py | 4 +- airflow/operators/python.py | 2 + airflow/providers/amazon/aws/hooks/batch_client.py | 4 +- .../providers/amazon/aws/hooks/batch_waiters.py | 18 +- .../amazon/aws/transfers/dynamodb_to_s3.py | 20 +- airflow/providers/apache/beam/CHANGELOG.rst | 14 +- airflow/providers/ftp/hooks/ftp.py | 15 +- airflow/providers/google/CHANGELOG.rst | 14 +- .../operators/cloud_storage_transfer_service.py | 22 +- .../providers/google/cloud/operators/dataflow.py | 93 ++++---- .../providers/google/cloud/operators/dataproc.py | 10 +- airflow/providers/google/cloud/operators/gcs.py | 12 +- .../providers/google/cloud/transfers/s3_to_gcs.py | 17 +- .../google/cloud/utils/mlengine_operator_utils.py | 25 +- .../cloud/utils/mlengine_prediction_summary.py | 23 +- airflow/providers/http/hooks/http.py | 17 +- airflow/providers/telegram/hooks/telegram.py | 4 +- airflow/sensors/date_time.py | 4 +- airflow/utils/dates.py | 53 +++-- breeze-complete | 1 + .../apache-airflow-providers-apache-beam/index.rst | 14 +- .../connections/gcp.rst | 4 +- docs/apache-airflow-providers-google/index.rst | 14 +- docs/apache-airflow-providers-jdbc/operators.rst | 16 +- .../operators/postgres_operator_howto_guide.rst | 50 ++-- .../howto/create-update-providers.rst | 18 +- docs/apache-airflow/best-practices.rst | 89 +++---- docs/apache-airflow/concepts/dags.rst | 40 ++-- docs/apache-airflow/concepts/operators.rst | 56 ++--- docs/apache-airflow/concepts/pools.rst | 4 +- docs/apache-airflow/dag-run.rst | 25 +- docs/apache-airflow/dag-serialization.rst | 1 + docs/apache-airflow/executor/debug.rst | 9 +- docs/apache-airflow/faq.rst | 32 +-- docs/apache-airflow/howto/add-dag-tags.rst | 6 +- docs/apache-airflow/howto/connection.rst | 61 ++--- docs/apache-airflow/howto/custom-operator.rst | 77 +++--- .../howto/customize-state-colors-ui.rst | 18 +- docs/apache-airflow/howto/define_extra_link.rst | 36 +-- docs/apache-airflow/howto/operator/bash.rst | 21 +- docs/apache-airflow/howto/variable.rst | 1 + docs/apache-airflow/kubernetes.rst | 3 +- docs/apache-airflow/lineage.rst | 29 ++- docs/apache-airflow/modules_management.rst | 2 +- docs/apache-airflow/plugins.rst | 72 +++--- docs/apache-airflow/security/secrets/fernet.rst | 5 +- docs/apache-airflow/security/secrets/index.rst | 3 +- docs/apache-airflow/timezone.rst | 24 +- docs/apache-airflow/tutorial_taskflow_api.rst | 4 +- docs/apache-airflow/ui.rst | 2 +- docs/apache-airflow/upgrading-to-2.rst | 258 ++++++++++----------- tests/dags_corrupted/README.md | 3 +- tests/test_utils/perf/perf_kit/__init__.py | 24 +- 61 files changed, 872 insertions(+), 770 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dc5591c..b343c50 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -159,6 +159,12 @@ repos: - id: black args: [--config=./pyproject.toml] exclude: ^airflow/_vendor/ + - repo: https://github.com/asottile/blacken-docs + rev: v1.10.0 + hooks: + - id: blacken-docs + alias: black + additional_dependencies: [black==21.4b2] - repo: https://github.com/pre-commit/pre-commit-hooks rev: v3.4.0 hooks: diff --git a/BREEZE.rst b/BREEZE.rst index 2f0510a..09c835c 100644 --- a/BREEZE.rst +++ b/BREEZE.rst @@ -2232,11 +2232,11 @@ This is the current syntax for `./breeze <./breeze>`_: all all-but-pylint airflow-config-yaml airflow-providers-available airflow-provider-yaml-files-ok base-operator bats-tests bats-in-container-tests - black build build-providers-dependencies check-apache-license check-builtin-literals - check-executables-have-shebangs check-hooks-apply check-integrations - check-merge-conflict check-xml consistent-pylint daysago-import-check - debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer - fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity + black blacken-docs build build-providers-dependencies check-apache-license + check-builtin-literals check-executables-have-shebangs check-hooks-apply + check-integrations check-merge-conflict check-xml consistent-pylint + daysago-import-check debug-statements detect-private-key doctoc dont-use-safe-filter + end-of-file-fixer fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm no-providers-in-core-examples no-relative-imports pre-commit-descriptions diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 1a480e2..2aa18f9 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -960,9 +960,11 @@ To make this easier there is the ``create_session`` helper: from airflow.utils.session import create_session + def my_call(*args, session): - ... - # You MUST not commit the session here. + ... + # You MUST not commit the session here. + with create_session() as session: my_call(*args, session=session) @@ -973,12 +975,11 @@ If this function is designed to be called by "end-users" (i.e. DAG authors) then from airflow.utils.session import provide_session - ... @provide_session - def my_method(arg, arg, session=None) - ... - # You SHOULD not commit the session here. The wrapper will take care of commit()/rollback() if exception + def my_method(arg, session=None): + ... + # You SHOULD not commit the session here. The wrapper will take care of commit()/rollback() if exception Don't use time() for duration calculations ----------------------------------------- diff --git a/CONTRIBUTORS_QUICK_START.rst b/CONTRIBUTORS_QUICK_START.rst index ea5f547..dd8b048 100644 --- a/CONTRIBUTORS_QUICK_START.rst +++ b/CONTRIBUTORS_QUICK_START.rst @@ -511,10 +511,11 @@ Setting up Debug .. code-block:: python - if __name__ == '__main__': - from airflow.utils.state import State - dag.clear(dag_run_state=State.NONE) - dag.run() + if __name__ == "__main__": + from airflow.utils.state import State + + dag.clear(dag_run_state=State.NONE) + dag.run() - Add ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` to Environment variable of Run Configuration. diff --git a/STATIC_CODE_CHECKS.rst b/STATIC_CODE_CHECKS.rst index 8a55dd0..75dc31f 100644 --- a/STATIC_CODE_CHECKS.rst +++ b/STATIC_CODE_CHECKS.rst @@ -60,6 +60,8 @@ require Breeze Docker images to be installed locally: ----------------------------------- ---------------------------------------------------------------- ------------ ``black`` Runs Black (the uncompromising Python code formatter) ----------------------------------- ---------------------------------------------------------------- ------------ +``blacken-docs`` Run black on python code blocks in documentation files +----------------------------------- ---------------------------------------------------------------- ------------ ``build`` Builds image for mypy, pylint, flake8 * ----------------------------------- ---------------------------------------------------------------- ------------ ``build-providers-dependencies`` Regenerates the JSON file with cross-provider dependencies @@ -331,15 +333,17 @@ These are guidelines for fixing errors reported by pylint: .. code-block:: python - import airflow.* # pylint: disable=wildcard-import + from airflow import * # pylint: disable=wildcard-import - If there is a single line where you need to disable a particular error, consider adding a comment to the line that causes the problem. For example: + .. code-block:: python - def MakeSummary(pcoll, metric_fn, metric_keys): # pylint: disable=invalid-name + def MakeSummary(pcoll, metric_fn, metric_keys): # pylint: disable=invalid-name + ... - For multiple lines/block of code, to disable an error, you can surround the @@ -348,10 +352,13 @@ These are guidelines for fixing errors reported by pylint: .. code-block:: python # pylint: disable=too-few-public-methods - class LoginForm(Form): + class LoginForm(Form): """Form for the user""" - username = StringField('Username', [InputRequired()]) - password = PasswordField('Password', [InputRequired()]) + + username = StringField("Username", [InputRequired()]) + password = PasswordField("Password", [InputRequired()]) + + # pylint: enable=too-few-public-methods diff --git a/TESTING.rst b/TESTING.rst index dd53a06..81b8579 100644 --- a/TESTING.rst +++ b/TESTING.rst @@ -205,6 +205,7 @@ go to the ``chart/tests`` directory and add your unit test by creating a class t .. code-block:: python class TestBaseChartTest(unittest.TestCase): + ... To render the chart create a YAML string with the nested dictionary of options you wish to test. You can then use our ``render_chart`` function to render the object of interest into a testable Python dictionary. Once the chart @@ -226,11 +227,13 @@ Example test here: class TestGitSyncScheduler(unittest.TestCase): - def test_basic(self): helm_settings = yaml.safe_load(git_sync_basic) - res = render_chart('GIT-SYNC', helm_settings, - show_only=["templates/scheduler/scheduler-deployment.yaml"]) + res = render_chart( + "GIT-SYNC", + helm_settings, + show_only=["templates/scheduler/scheduler-deployment.yaml"], + ) dep: k8s.V1Deployment = render_k8s_object(res[0], k8s.V1Deployment) assert "dags" == dep.spec.template.spec.volumes[1].name @@ -327,10 +330,10 @@ Example of the ``redis`` integration test: @pytest.mark.integration("redis") def test_real_ping(self): - hook = RedisHook(redis_conn_id='redis_default') + hook = RedisHook(redis_conn_id="redis_default") redis = hook.get_conn() - assert redis.ping(), 'Connection to Redis with PING works.' + assert redis.ping(), "Connection to Redis with PING works." The markers can be specified at the test level or the class level (then all tests in this class require an integration). You can add multiple markers with different integrations for tests that @@ -1270,10 +1273,11 @@ It will run a backfill job: .. code-block:: python - if __name__ == '__main__': - from airflow.utils.state import State - dag.clear(dag_run_state=State.NONE) - dag.run() + if __name__ == "__main__": + from airflow.utils.state import State + + dag.clear(dag_run_state=State.NONE) + dag.run() 2. Set up ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in the run configuration of your IDE. diff --git a/UPDATING.md b/UPDATING.md index bf98557..c7a97df 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -343,18 +343,26 @@ The old syntax of passing `context` as a dictionary will continue to work with t ```python def execution_date_fn(execution_date, ctx): + ... ``` `execution_date_fn` can take in any number of keyword arguments available in the task context dictionary. The following forms of `execution_date_fn` are all supported: ```python def execution_date_fn(dt): + ... + def execution_date_fn(execution_date): + ... + def execution_date_fn(execution_date, ds_nodash): + ... + def execution_date_fn(execution_date, ds_nodash, dag): + ... ``` ### The default value for `[webserver] cookie_samesite` has been changed to `Lax` @@ -545,6 +553,7 @@ If your plugin looked like this and was available through the `test_plugin` path def my_stat_name_handler(stat): return stat + class AirflowTestPlugin(AirflowPlugin): name = "test_plugin" stat_name_handler = my_stat_name_handler @@ -720,8 +729,8 @@ If want to use them, or your custom hook inherit them, please use ``airflow.hook Previously, you could assign a task to a DAG as follows: ```python -dag = DAG('my_dag') -dummy = DummyOperator(task_id='dummy') +dag = DAG("my_dag") +dummy = DummyOperator(task_id="dummy") dag >> dummy ``` @@ -729,8 +738,8 @@ dag >> dummy This is no longer supported. Instead, we recommend using the DAG as context manager: ```python -with DAG('my_dag') as dag: - dummy = DummyOperator(task_id='dummy') +with DAG("my_dag") as dag: + dummy = DummyOperator(task_id="dummy") ``` #### Removed deprecated import mechanism @@ -815,7 +824,8 @@ As a result, the `python_callable` argument was removed. PR: https://github.com/ def myfunc(execution_date): print(execution_date) -python_operator = PythonOperator(task_id='mytask', python_callable=myfunc, dag=dag) + +python_operator = PythonOperator(task_id="mytask", python_callable=myfunc, dag=dag) ``` Notice you don't have to set provide_context=True, variables from the task context are now automatically detected and provided. @@ -826,7 +836,8 @@ All context variables can still be provided with a double-asterisk argument: def myfunc(**context): print(context) # all variables will be provided to context -python_operator = PythonOperator(task_id='mytask', python_callable=myfunc) + +python_operator = PythonOperator(task_id="mytask", python_callable=myfunc) ``` The task context variable names are reserved names in the callable function, hence a clash with `op_args` and `op_kwargs` results in an exception: @@ -835,9 +846,11 @@ The task context variable names are reserved names in the callable function, hen def myfunc(dag): # raises a ValueError because "dag" is a reserved name # valid signature example: myfunc(mydag) + print("output") + python_operator = PythonOperator( - task_id='mytask', + task_id="mytask", op_args=[1], python_callable=myfunc, ) @@ -979,28 +992,34 @@ This change is caused by adding `run_type` column to `DagRun`. Previous signature: ```python -def create_dagrun(self, - run_id, - state, - execution_date=None, - start_date=None, - external_trigger=False, - conf=None, - session=None): +def create_dagrun( + self, + run_id, + state, + execution_date=None, + start_date=None, + external_trigger=False, + conf=None, + session=None, +): + ... ``` current: ```python -def create_dagrun(self, - state, - execution_date=None, - run_id=None, - start_date=None, - external_trigger=False, - conf=None, - run_type=None, - session=None): +def create_dagrun( + self, + state, + execution_date=None, + run_id=None, + start_date=None, + external_trigger=False, + conf=None, + run_type=None, + session=None, +): + ... ``` If user provides `run_id` then the `run_type` will be derived from it by checking prefix, allowed types @@ -1025,7 +1044,7 @@ Previously, there were defined in various places, example as `ID_PREFIX` class v Was: -```python +```pycon >> from airflow.models.dagrun import DagRun >> DagRun.ID_PREFIX scheduled__ @@ -1033,7 +1052,7 @@ scheduled__ Replaced by: -```python +```pycon >> from airflow.utils.types import DagRunType >> DagRunType.SCHEDULED.value scheduled @@ -1094,8 +1113,9 @@ from airflow.utils.log.logging_mixin import StreamLogWriter logger = logging.getLogger("custom-logger") -with redirect_stdout(StreamLogWriter(logger, logging.INFO)), \ - redirect_stderr(StreamLogWriter(logger, logging.WARN)): +with redirect_stdout(StreamLogWriter(logger, logging.INFO)), redirect_stderr( + StreamLogWriter(logger, logging.WARN) +): print("I Love Airflow") ``` @@ -1117,23 +1137,25 @@ are deprecated and will be removed in future versions. **Previous signature**: ```python -DagBag( +def __init__( dag_folder=None, - include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), - safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), - store_serialized_dags=False + include_examples=conf.getboolean("core", "LOAD_EXAMPLES"), + safe_mode=conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"), + store_serialized_dags=False, ): + ... ``` **current**: ```python -DagBag( +def __init__( dag_folder=None, - include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), - safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'), - read_dags_from_db=False + include_examples=conf.getboolean("core", "LOAD_EXAMPLES"), + safe_mode=conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"), + read_dags_from_db=False, ): + ... ``` If you were using positional arguments, it requires no change but if you were using keyword @@ -1626,13 +1648,23 @@ We changed signature of BigQueryGetDatasetTablesOperator. Before: ```python -BigQueryGetDatasetTablesOperator(dataset_id: str, dataset_resource: dict, ...) +def __init__( + dataset_id: str, + dataset_resource: dict, + # ... +): + ... ``` After: ```python -BigQueryGetDatasetTablesOperator(dataset_resource: dict, dataset_id: Optional[str] = None, ...) +def __init__( + dataset_resource: dict, + dataset_id: Optional[str] = None, + # ... +): + ... ``` ### Changes in `amazon` provider package @@ -1793,7 +1825,7 @@ For example: from airflow.providers.cloudant.hooks.cloudant import CloudantHook with CloudantHook().get_conn() as cloudant_session: - database = cloudant_session['database_name'] + database = cloudant_session["database_name"] ``` See the [docs](https://python-cloudant.readthedocs.io/en/latest/) for more information on how to use the new cloudant version. @@ -2101,8 +2133,8 @@ back was ``None``. This will now return an empty string (`'''`) Example: ```python ->> Variable.set('test_key', '') ->> Variable.get('test_key') +Variable.set("test_key", "") +Variable.get("test_key") ``` The above code returned `None` previously, now it will return `''`. @@ -2168,8 +2200,8 @@ the `attr` argument is no longer required (or accepted). In order to use this function in subclasses of the `BaseOperator`, the `attr` argument must be removed: ```python -result = self.render_template('myattr', self.myattr, context) # Pre-1.10.6 call -... +result = self.render_template("myattr", self.myattr, context) # Pre-1.10.6 call +# ... result = self.render_template(self.myattr, context) # Post-1.10.6 call ``` @@ -2258,6 +2290,7 @@ Old signature: ```python def get_task_instances(self, session, start_date=None, end_date=None): + ... ``` New signature: @@ -2265,6 +2298,7 @@ New signature: ```python @provide_session def get_task_instances(self, start_date=None, end_date=None, session=None): + ... ``` #### For `DAG` @@ -2272,16 +2306,16 @@ def get_task_instances(self, start_date=None, end_date=None, session=None): Old signature: ```python -def get_task_instances( - self, session, start_date=None, end_date=None, state=None): +def get_task_instances(self, session, start_date=None, end_date=None, state=None): + ... ``` New signature: ```python @provide_session -def get_task_instances( - self, start_date=None, end_date=None, state=None, session=None): +def get_task_instances(self, start_date=None, end_date=None, state=None, session=None): + ... ``` In either case, it is necessary to rewrite calls to the `get_task_instances` method that currently provide the `session` positional argument. New calls to this method look like: @@ -2413,23 +2447,25 @@ Old signature: ```python def create_transfer_job(self, description, schedule, transfer_spec, project_id=None): + ... ``` New signature: ```python def create_transfer_job(self, body): + ... ``` It is necessary to rewrite calls to method. The new call looks like this: ```python body = { - 'status': 'ENABLED', - 'projectId': project_id, - 'description': description, - 'transferSpec': transfer_spec, - 'schedule': schedule, + "status": "ENABLED", + "projectId": project_id, + "description": description, + "transferSpec": transfer_spec, + "schedule": schedule, } gct_hook.create_transfer_job(body) ``` @@ -2444,12 +2480,16 @@ Old signature: ```python def wait_for_transfer_job(self, job): + ... ``` New signature: ```python -def wait_for_transfer_job(self, job, expected_statuses=(GcpTransferOperationStatus.SUCCESS, )): +def wait_for_transfer_job( + self, job, expected_statuses=(GcpTransferOperationStatus.SUCCESS,) +): + ... ``` The behavior of `wait_for_transfer_job` has changed: @@ -2542,7 +2582,7 @@ previously you had this in your user class ```python def is_active(self): - return self.active + return self.active ``` then you need to change it like this @@ -2550,7 +2590,7 @@ then you need to change it like this ```python @property def is_active(self): - return self.active + return self.active ``` ### Support autodetected schemas to GoogleCloudStorageToBigQueryOperator @@ -2563,24 +2603,27 @@ define a schema_fields: ```python gcs_to_bq.GoogleCloudStorageToBigQueryOperator( - ... - schema_fields={...}) + # ... + schema_fields={...} +) ``` or define a schema_object: ```python gcs_to_bq.GoogleCloudStorageToBigQueryOperator( - ... - schema_object='path/to/schema/object') + # ... + schema_object="path/to/schema/object" +) ``` or enabled autodetect of schema: ```python gcs_to_bq.GoogleCloudStorageToBigQueryOperator( - ... - autodetect=True) + # ... + autodetect=True +) ``` ## Airflow 1.10.1 diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 1cf3797..5433ad5 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -2378,11 +2378,12 @@ class DagContext: .. code-block:: python with DAG( - dag_id='example_dag', + dag_id="example_dag", default_args=default_args, - schedule_interval='0 0 * * *', - dagrun_timeout=timedelta(minutes=60) + schedule_interval="0 0 * * *", + dagrun_timeout=timedelta(minutes=60), ) as dag: + ... If you do this the context stores the DAG and whenever new task is created, it will use such stored DAG as the parent DAG. diff --git a/airflow/operators/bash.py b/airflow/operators/bash.py index 66a70a9..44e1aa2 100644 --- a/airflow/operators/bash.py +++ b/airflow/operators/bash.py @@ -113,8 +113,8 @@ class BashOperator(BaseOperator): bash_task = BashOperator( task_id="bash_task", - bash_command='echo "here is the message: \'$message\'"', - env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, + bash_command="echo \"here is the message: '$message'\"", + env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'}, ) """ diff --git a/airflow/operators/python.py b/airflow/operators/python.py index fa8020c..83d1018 100644 --- a/airflow/operators/python.py +++ b/airflow/operators/python.py @@ -456,6 +456,8 @@ def get_current_context() -> Dict[str, Any]: .. code:: python from airflow.operators.python import get_current_context + + def my_task(): context = get_current_context() ti = context["ti"] diff --git a/airflow/providers/amazon/aws/hooks/batch_client.py b/airflow/providers/amazon/aws/hooks/batch_client.py index 6aa7156..fd54545 100644 --- a/airflow/providers/amazon/aws/hooks/batch_client.py +++ b/airflow/providers/amazon/aws/hooks/batch_client.py @@ -89,7 +89,8 @@ class AwsBatchProtocol(Protocol): .. code-block:: python import boto3 - boto3.client('batch').waiter_names == [] + + boto3.client("batch").waiter_names == [] .. seealso:: @@ -511,6 +512,7 @@ class AwsBatchClientHook(AwsBaseHook): delay = min(max_interval, delay) print(delay / 3, delay) + for tries in range(10): exp(tries) diff --git a/airflow/providers/amazon/aws/hooks/batch_waiters.py b/airflow/providers/amazon/aws/hooks/batch_waiters.py index 9b739dc..ef5f8d3 100644 --- a/airflow/providers/amazon/aws/hooks/batch_waiters.py +++ b/airflow/providers/amazon/aws/hooks/batch_waiters.py @@ -44,7 +44,6 @@ class AwsBatchWaitersHook(AwsBatchClientHook): """ A utility to manage waiters for AWS batch services. - Examples: .. code-block:: python import random @@ -60,7 +59,7 @@ class AwsBatchWaitersHook(AwsBatchClientHook): # modify custom_config['waiters'] as necessary and get a new instance: waiters = AwsBatchWaiters(waiter_config=custom_config) waiters.waiter_config # check the custom configuration (this is a deepcopy) - waiters.list_waiters() # names of custom waiters + waiters.list_waiters() # names of custom waiters # During the init for AwsBatchWaiters, the waiter_config is used to build a waiter_model; # and note that this only occurs during the class init, to avoid any accidental mutations @@ -71,12 +70,16 @@ class AwsBatchWaitersHook(AwsBatchClientHook): # and the details of the config on that waiter can be further modified without any # accidental impact on the generation of new waiters from the defined waiter_model, e.g. waiters.get_waiter("JobExists").config.delay # -> 5 - waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists object + waiter = waiters.get_waiter( + "JobExists" + ) # -> botocore.waiter.Batch.Waiter.JobExists object waiter.config.delay = 10 waiters.get_waiter("JobExists").config.delay # -> 5 as defined by waiter_model # To use a specific waiter, update the config and call the `wait()` method for jobId, e.g. - waiter = waiters.get_waiter("JobExists") # -> botocore.waiter.Batch.Waiter.JobExists object + waiter = waiters.get_waiter( + "JobExists" + ) # -> botocore.waiter.Batch.Waiter.JobExists object waiter.config.delay = random.uniform(1, 10) # seconds waiter.config.max_attempts = 10 waiter.wait(jobs=[jobId]) @@ -154,7 +157,8 @@ class AwsBatchWaitersHook(AwsBatchClientHook): The ``.waiter_model`` is combined with the ``.client`` to get a specific waiter and the properties of that waiter can be modified without any accidental impact on the generation of new waiters from the ``.waiter_model``, e.g. - .. code-block:: + + .. code-block:: python waiters.get_waiter("JobExists").config.delay # -> 5 waiter = waiters.get_waiter("JobExists") # a new waiter object @@ -162,9 +166,11 @@ class AwsBatchWaitersHook(AwsBatchClientHook): waiters.get_waiter("JobExists").config.delay # -> 5 as defined by waiter_model To use a specific waiter, update the config and call the `wait()` method for jobId, e.g. - .. code-block:: + + .. code-block:: python import random + waiter = waiters.get_waiter("JobExists") # a new waiter object waiter.config.delay = random.uniform(1, 10) # seconds waiter.config.max_attempts = 10 diff --git a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index 7e0e410..9bc467a 100644 --- a/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -63,23 +63,23 @@ class DynamoDBToS3Operator(BaseOperator): .. code-block:: python op1 = DynamoDBToS3Operator( - task_id='replicator-1', - dynamodb_table_name='hello', + task_id="replicator-1", + dynamodb_table_name="hello", dynamodb_scan_kwargs={ - 'TotalSegments': 2, - 'Segment': 0, + "TotalSegments": 2, + "Segment": 0, }, - ... + ..., ) op2 = DynamoDBToS3Operator( - task_id='replicator-2', - dynamodb_table_name='hello', + task_id="replicator-2", + dynamodb_table_name="hello", dynamodb_scan_kwargs={ - 'TotalSegments': 2, - 'Segment': 1, + "TotalSegments": 2, + "Segment": 1, }, - ... + ..., ) :param dynamodb_table_name: Dynamodb table to replicate data from diff --git a/airflow/providers/apache/beam/CHANGELOG.rst b/airflow/providers/apache/beam/CHANGELOG.rst index b48aaf5..ed98e31 100644 --- a/airflow/providers/apache/beam/CHANGELOG.rst +++ b/airflow/providers/apache/beam/CHANGELOG.rst @@ -51,17 +51,19 @@ This is the extra for the ``google`` provider: .. code-block:: python - extras_require={ - ... - 'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'], - .... - }, + extras_require = ( + { + # ... + "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"], + # ... + }, + ) And likewise this is the extra for the ``apache.beam`` provider: .. code-block:: python - extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']}, + extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},) You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour: diff --git a/airflow/providers/ftp/hooks/ftp.py b/airflow/providers/ftp/hooks/ftp.py index a03e461..ac35aa4 100644 --- a/airflow/providers/ftp/hooks/ftp.py +++ b/airflow/providers/ftp/hooks/ftp.py @@ -143,23 +143,28 @@ class FTPHook(BaseHook): .. code-block:: python - hook = FTPHook(ftp_conn_id='my_conn') + hook = FTPHook(ftp_conn_id="my_conn") - remote_path = '/path/to/remote/file' - local_path = '/path/to/local/file' + remote_path = "/path/to/remote/file" + local_path = "/path/to/local/file" # with a custom callback (in this case displaying progress on each read) def print_progress(percent_progress): - self.log.info('Percent Downloaded: %s%%' % percent_progress) + self.log.info("Percent Downloaded: %s%%" % percent_progress) + total_downloaded = 0 total_file_size = hook.get_size(remote_path) - output_handle = open(local_path, 'wb') + output_handle = open(local_path, "wb") + + def write_to_file_with_progress(data): total_downloaded += len(data) output_handle.write(data) percent_progress = (total_downloaded / total_file_size) * 100 print_progress(percent_progress) + + hook.retrieve_file(remote_path, None, callback=write_to_file_with_progress) # without a custom callback data is written to the local_path diff --git a/airflow/providers/google/CHANGELOG.rst b/airflow/providers/google/CHANGELOG.rst index 4d2f5a1..aa4f7d8 100644 --- a/airflow/providers/google/CHANGELOG.rst +++ b/airflow/providers/google/CHANGELOG.rst @@ -57,17 +57,19 @@ This is the extra for the ``google`` provider: .. code-block:: python - extras_require={ - ... - 'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'], - .... - }, + extras_require = ( + { + # ... + "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"], + # ... + }, + ) And likewise this is the extra for the ``apache.beam`` provider: .. code-block:: python - extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']}, + extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},) You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour: diff --git a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py index 6c803ef..09794c7 100644 --- a/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py +++ b/airflow/providers/google/cloud/operators/cloud_storage_transfer_service.py @@ -751,11 +751,12 @@ class CloudDataTransferServiceS3ToGCSOperator(BaseOperator): .. code-block:: python s3_to_gcs_transfer_op = S3ToGoogleCloudStorageTransferOperator( - task_id='s3_to_gcs_transfer_example', - s3_bucket='my-s3-bucket', - project_id='my-gcp-project', - gcs_bucket='my-gcs-bucket', - dag=my_dag) + task_id="s3_to_gcs_transfer_example", + s3_bucket="my-s3-bucket", + project_id="my-gcp-project", + gcs_bucket="my-gcs-bucket", + dag=my_dag, + ) :param s3_bucket: The S3 bucket where to find the objects. (templated) :type s3_bucket: str @@ -921,11 +922,12 @@ class CloudDataTransferServiceGCSToGCSOperator(BaseOperator): .. code-block:: python gcs_to_gcs_transfer_op = GoogleCloudStorageToGoogleCloudStorageTransferOperator( - task_id='gcs_to_gcs_transfer_example', - source_bucket='my-source-bucket', - destination_bucket='my-destination-bucket', - project_id='my-gcp-project', - dag=my_dag) + task_id="gcs_to_gcs_transfer_example", + source_bucket="my-source-bucket", + destination_bucket="my-destination-bucket", + project_id="my-gcp-project", + dag=my_dag, + ) :param source_bucket: The source Google Cloud Storage bucket where the object is. (templated) diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index cb31c11..010c9b6 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -181,36 +181,36 @@ class DataflowCreateJavaJobOperator(BaseOperator): **Example**: :: default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'start_date': - (2016, 8, 1), - 'email': ['a...@vanboxel.be'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=30), - 'dataflow_default_options': { - 'project': 'my-gcp-project', - 'zone': 'us-central1-f', - 'stagingLocation': 'gs://bucket/tmp/dataflow/staging/', - } + "owner": "airflow", + "depends_on_past": False, + "start_date": (2016, 8, 1), + "email": ["a...@vanboxel.be"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=30), + "dataflow_default_options": { + "project": "my-gcp-project", + "zone": "us-central1-f", + "stagingLocation": "gs://bucket/tmp/dataflow/staging/", + }, } - dag = DAG('test-dag', default_args=default_args) + dag = DAG("test-dag", default_args=default_args) task = DataflowCreateJavaJobOperator( - gcp_conn_id='gcp_default', - task_id='normalize-cal', - jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar', + gcp_conn_id="gcp_default", + task_id="normalize-cal", + jar="{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar", options={ - 'autoscalingAlgorithm': 'BASIC', - 'maxNumWorkers': '50', - 'start': '{{ds}}', - 'partitionType': 'DAY' - + "autoscalingAlgorithm": "BASIC", + "maxNumWorkers": "50", + "start": "{{ds}}", + "partitionType": "DAY", }, - dag=dag) + dag=dag, + ) + .. seealso:: For more detail on job submission have a look at the reference: @@ -316,9 +316,9 @@ class DataflowCreateJavaJobOperator(BaseOperator): .. code-block:: python default_args = { - 'dataflow_default_options': { - 'zone': 'europe-west1-d', - 'stagingLocation': 'gs://my-staging-bucket/staging/' + "dataflow_default_options": { + "zone": "europe-west1-d", + "stagingLocation": "gs://my-staging-bucket/staging/", } } @@ -330,17 +330,18 @@ class DataflowCreateJavaJobOperator(BaseOperator): .. code-block:: python t1 = DataflowCreateJavaJobOperator( - task_id='dataflow_example', - jar='{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar', + task_id="dataflow_example", + jar="{{var.value.gcp_dataflow_base}}pipeline/build/libs/pipeline-example-1.0.jar", options={ - 'autoscalingAlgorithm': 'BASIC', - 'maxNumWorkers': '50', - 'start': '{{ds}}', - 'partitionType': 'DAY', - 'labels': {'foo' : 'bar'} + "autoscalingAlgorithm": "BASIC", + "maxNumWorkers": "50", + "start": "{{ds}}", + "partitionType": "DAY", + "labels": {"foo": "bar"}, }, - gcp_conn_id='airflow-conn-id', - dag=my-dag) + gcp_conn_id="airflow-conn-id", + dag=my - dag, + ) """ @@ -583,10 +584,9 @@ class DataflowTemplatedJobStartOperator(BaseOperator): .. code-block:: python default_args = { - 'dataflow_default_options': { - 'zone': 'europe-west1-d', - 'tempLocation': 'gs://my-staging-bucket/staging/', - } + "dataflow_default_options": { + "zone": "europe-west1-d", + "tempLocation": "gs://my-staging-bucket/staging/", } } @@ -597,14 +597,15 @@ class DataflowTemplatedJobStartOperator(BaseOperator): .. code-block:: python t1 = DataflowTemplatedJobStartOperator( - task_id='dataflow_example', - template='{{var.value.gcp_dataflow_base}}', + task_id="dataflow_example", + template="{{var.value.gcp_dataflow_base}}", parameters={ - 'inputFile': "gs://bucket/input/my_input.txt", - 'outputFile': "gs://bucket/output/my_output.txt" + "inputFile": "gs://bucket/input/my_input.txt", + "outputFile": "gs://bucket/output/my_output.txt", }, - gcp_conn_id='airflow-conn-id', - dag=my-dag) + gcp_conn_id="airflow-conn-id", + dag=my - dag, + ) ``template``, ``dataflow_default_options``, ``parameters``, and ``job_name`` are templated so you can use variables in them. diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index fea5acc8..6952139 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -1108,11 +1108,11 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator): .. code-block:: python default_args = { - 'cluster_name': 'cluster-1', - 'dataproc_pig_jars': [ - 'gs://example/udf/jar/datafu/1.2.0/datafu.jar', - 'gs://example/udf/jar/gpig/1.2/gpig.jar' - ] + "cluster_name": "cluster-1", + "dataproc_pig_jars": [ + "gs://example/udf/jar/datafu/1.2.0/datafu.jar", + "gs://example/udf/jar/gpig/1.2/gpig.jar", + ], } You can pass a pig script as string or file reference. Use variables to pass on diff --git a/airflow/providers/google/cloud/operators/gcs.py b/airflow/providers/google/cloud/operators/gcs.py index 5fa1eb2..f38bea9 100644 --- a/airflow/providers/google/cloud/operators/gcs.py +++ b/airflow/providers/google/cloud/operators/gcs.py @@ -96,12 +96,12 @@ class GCSCreateBucketOperator(BaseOperator): .. code-block:: python CreateBucket = GoogleCloudStorageCreateBucketOperator( - task_id='CreateNewBucket', - bucket_name='test-bucket', - storage_class='MULTI_REGIONAL', - location='EU', - labels={'env': 'dev', 'team': 'airflow'}, - gcp_conn_id='airflow-conn-id' + task_id="CreateNewBucket", + bucket_name="test-bucket", + storage_class="MULTI_REGIONAL", + location="EU", + labels={"env": "dev", "team": "airflow"}, + gcp_conn_id="airflow-conn-id", ) """ diff --git a/airflow/providers/google/cloud/transfers/s3_to_gcs.py b/airflow/providers/google/cloud/transfers/s3_to_gcs.py index 81cd6c4..4787582 100644 --- a/airflow/providers/google/cloud/transfers/s3_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/s3_to_gcs.py @@ -87,14 +87,15 @@ class S3ToGCSOperator(S3ListOperator): .. code-block:: python s3_to_gcs_op = S3ToGCSOperator( - task_id='s3_to_gcs_example', - bucket='my-s3-bucket', - prefix='data/customers-201804', - dest_gcs_conn_id='google_cloud_default', - dest_gcs='gs://my.gcs.bucket/some/customers/', - replace=False, - gzip=True, - dag=my-dag) + task_id="s3_to_gcs_example", + bucket="my-s3-bucket", + prefix="data/customers-201804", + dest_gcs_conn_id="google_cloud_default", + dest_gcs="gs://my.gcs.bucket/some/customers/", + replace=False, + gzip=True, + dag=my - dag, + ) Note that ``bucket``, ``prefix``, ``delimiter`` and ``dest_gcs`` are templated, so you can use variables in them if you wish. diff --git a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py index bd292c9..f99fc33 100644 --- a/airflow/providers/google/cloud/utils/mlengine_operator_utils.py +++ b/airflow/providers/google/cloud/utils/mlengine_operator_utils.py @@ -101,21 +101,24 @@ def create_evaluate_ops( # pylint: disable=too-many-arguments def get_metric_fn_and_keys(): import math # imports should be outside of the metric_fn below. + def error_and_squared_error(inst): - label = float(inst['input_label']) - classes = float(inst['classes']) # 0 or 1 - err = abs(classes-label) - squared_err = math.pow(classes-label, 2) + label = float(inst["input_label"]) + classes = float(inst["classes"]) # 0 or 1 + err = abs(classes - label) + squared_err = math.pow(classes - label, 2) return (err, squared_err) # returns a tuple. - return error_and_squared_error, ['err', 'mse'] # key order must match. + + return error_and_squared_error, ["err", "mse"] # key order must match. + def validate_err_and_count(summary): - if summary['err'] > 0.2: - raise ValueError('Too high err>0.2; summary=%s' % summary) - if summary['mse'] > 0.05: - raise ValueError('Too high mse>0.05; summary=%s' % summary) - if summary['count'] < 1000: - raise ValueError('Too few instances<1000; summary=%s' % summary) + if summary["err"] > 0.2: + raise ValueError("Too high err>0.2; summary=%s" % summary) + if summary["mse"] > 0.05: + raise ValueError("Too high mse>0.05; summary=%s" % summary) + if summary["count"] < 1000: + raise ValueError("Too few instances<1000; summary=%s" % summary) return summary For the details on the other BatchPrediction-related arguments (project_id, diff --git a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py index d95d8a6..273e5ef 100644 --- a/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py +++ b/airflow/providers/google/cloud/utils/mlengine_prediction_summary.py @@ -89,16 +89,19 @@ To test outside of the dag: .. code-block:: python - subprocess.check_call(["python", - "-m", - "airflow.providers.google.cloud.utils.mlengine_prediction_summary", - "--prediction_path=gs://...", - "--metric_fn_encoded=" + metric_fn_encoded, - "--metric_keys=log_loss,mse", - "--runner=DataflowRunner", - "--staging_location=gs://...", - "--temp_location=gs://...", - ]) + subprocess.check_call( + [ + "python", + "-m", + "airflow.providers.google.cloud.utils.mlengine_prediction_summary", + "--prediction_path=gs://...", + "--metric_fn_encoded=" + metric_fn_encoded, + "--metric_keys=log_loss,mse", + "--runner=DataflowRunner", + "--staging_location=gs://...", + "--temp_location=gs://...", + ] + ) """ import argparse diff --git a/airflow/providers/http/hooks/http.py b/airflow/providers/http/hooks/http.py index 6cf5f8e..029f5a2 100644 --- a/airflow/providers/http/hooks/http.py +++ b/airflow/providers/http/hooks/http.py @@ -170,7 +170,7 @@ class HttpHook(BaseHook): :param prepped_request: the prepared request generated in run() :type prepped_request: session.prepare_request :param extra_options: additional options to be used when executing the request - i.e. {'check_response': False} to avoid checking raising exceptions on non 2XX + i.e. ``{'check_response': False}`` to avoid checking raising exceptions on non 2XX or 3XX status codes :type extra_options: dict """ @@ -215,16 +215,13 @@ class HttpHook(BaseHook): .. code-block:: python - hook = HttpHook(http_conn_id='my_conn',method='GET') + hook = HttpHook(http_conn_id="my_conn", method="GET") retry_args = dict( - wait=tenacity.wait_exponential(), - stop=tenacity.stop_after_attempt(10), - retry=requests.exceptions.ConnectionError - ) - hook.run_with_advanced_retry( - endpoint='v1/test', - _retry_args=retry_args - ) + wait=tenacity.wait_exponential(), + stop=tenacity.stop_after_attempt(10), + retry=requests.exceptions.ConnectionError, + ) + hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args) """ self._retry_obj = tenacity.Retrying(**_retry_args) diff --git a/airflow/providers/telegram/hooks/telegram.py b/airflow/providers/telegram/hooks/telegram.py index 8d5e803..a4e2c83 100644 --- a/airflow/providers/telegram/hooks/telegram.py +++ b/airflow/providers/telegram/hooks/telegram.py @@ -43,12 +43,12 @@ class TelegramHook(BaseHook): .. code-block:: python # Create hook - telegram_hook = TelegramHook(telegram_conn_id='telegram_default') + telegram_hook = TelegramHook(telegram_conn_id="telegram_default") # or telegram_hook = TelegramHook(telegram_conn_id='telegram_default', chat_id='-1xxx') # or telegram_hook = TelegramHook(token='xxx:xxx', chat_id='-1xxx') # Call method from telegram bot client - telegram_hook.send_message(None', {"text": "message", "chat_id": "-1xxx"}) + telegram_hook.send_message(None, {"text": "message", "chat_id": "-1xxx"}) # or telegram_hook.send_message(None', {"text": "message"}) :param telegram_conn_id: connection that optionally has Telegram API token in the password field diff --git a/airflow/sensors/date_time.py b/airflow/sensors/date_time.py index 3229371..7afb421 100644 --- a/airflow/sensors/date_time.py +++ b/airflow/sensors/date_time.py @@ -45,8 +45,8 @@ class DateTimeSensor(BaseSensorOperator): .. code-block:: python DateTimeSensor( - task_id='wait_for_0100', - target_time='{{ next_execution_date.tomorrow().replace(hour=1) }}', + task_id="wait_for_0100", + target_time="{{ next_execution_date.tomorrow().replace(hour=1) }}", ) :param target_time: datetime after which the job succeeds. (templated) diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index de5e52b..2f908b1 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -46,17 +46,21 @@ def date_range( can be something that can be added to `datetime.datetime` or a cron expression as a `str` - .. code-block:: python - - date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1)) - [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), - datetime.datetime(2016, 1, 3, 0, 0)] - date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta='0 0 * * *') - [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 1, 2, 0, 0), - datetime.datetime(2016, 1, 3, 0, 0)] - date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") - [datetime.datetime(2016, 1, 1, 0, 0), datetime.datetime(2016, 2, 1, 0, 0), - datetime.datetime(2016, 3, 1, 0, 0)] + .. code-block:: pycon + >>> from airflow.utils.dates import datterange + >>> from datetime import datetime, timedelta + >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta=timedelta(1)) + [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), + datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')), + datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))] + >>> date_range(datetime(2016, 1, 1), datetime(2016, 1, 3), delta="0 0 * * *") + [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), + datetime.datetime(2016, 1, 2, 0, 0, tzinfo=Timezone('UTC')), + datetime.datetime(2016, 1, 3, 0, 0, tzinfo=Timezone('UTC'))] + >>> date_range(datetime(2016, 1, 1), datetime(2016, 3, 3), delta="0 0 0 * *") + [datetime.datetime(2016, 1, 1, 0, 0, tzinfo=Timezone('UTC')), + datetime.datetime(2016, 2, 1, 0, 0, tzinfo=Timezone('UTC')), + datetime.datetime(2016, 3, 1, 0, 0, tzinfo=Timezone('UTC'))] :param start_date: anchor date to start the series from :type start_date: datetime.datetime @@ -134,18 +138,21 @@ def round_time(dt, delta, start_date=timezone.make_aware(datetime.min)): Returns the datetime of the form start_date + i * delta which is closest to dt for any non-negative integer i. Note that delta may be a datetime.timedelta or a dateutil.relativedelta - >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) - datetime.datetime(2015, 1, 1, 0, 0) - >>> round_time(datetime(2015, 1, 2), relativedelta(months=1)) - datetime.datetime(2015, 1, 1, 0, 0) - >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 16, 0, 0) - >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 15, 0, 0) - >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 14, 0, 0) - >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) - datetime.datetime(2015, 9, 14, 0, 0) + + .. code-block:: pycon + + >>> round_time(datetime(2015, 1, 1, 6), timedelta(days=1)) + datetime.datetime(2015, 1, 1, 0, 0) + >>> round_time(datetime(2015, 1, 2), relativedelta(months=1)) + datetime.datetime(2015, 1, 1, 0, 0) + >>> round_time(datetime(2015, 9, 16, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) + datetime.datetime(2015, 9, 16, 0, 0) + >>> round_time(datetime(2015, 9, 15, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) + datetime.datetime(2015, 9, 15, 0, 0) + >>> round_time(datetime(2015, 9, 14, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) + datetime.datetime(2015, 9, 14, 0, 0) + >>> round_time(datetime(2015, 9, 13, 0, 0), timedelta(1), datetime(2015, 9, 14, 0, 0)) + datetime.datetime(2015, 9, 14, 0, 0) """ if isinstance(delta, str): # It's cron based, so it's easy diff --git a/breeze-complete b/breeze-complete index b0985eb..dd4bbf3 100644 --- a/breeze-complete +++ b/breeze-complete @@ -85,6 +85,7 @@ base-operator bats-tests bats-in-container-tests black +blacken-docs build build-providers-dependencies check-apache-license diff --git a/docs/apache-airflow-providers-apache-beam/index.rst b/docs/apache-airflow-providers-apache-beam/index.rst index e3e6a9e..cbc8bd9 100644 --- a/docs/apache-airflow-providers-apache-beam/index.rst +++ b/docs/apache-airflow-providers-apache-beam/index.rst @@ -146,17 +146,19 @@ This is the extra for the ``google`` provider: .. code-block:: python - extras_require={ - ... - 'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'], - .... - }, + extras_require = ( + { + # ... + "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"], + # .... + }, + ) And likewise this is the extra for the ``apache.beam`` provider: .. code-block:: python - extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']}, + extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},) You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour: diff --git a/docs/apache-airflow-providers-google/connections/gcp.rst b/docs/apache-airflow-providers-google/connections/gcp.rst index 7cdc8d1..888ebc4 100644 --- a/docs/apache-airflow-providers-google/connections/gcp.rst +++ b/docs/apache-airflow-providers-google/connections/gcp.rst @@ -139,7 +139,9 @@ For example: import os - from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator + from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryCreateEmptyDatasetOperator, + ) IMPERSONATION_CHAIN = "impersonated_account@your_project_id.iam.gserviceaccount.com" diff --git a/docs/apache-airflow-providers-google/index.rst b/docs/apache-airflow-providers-google/index.rst index 2f3230a..64392a4 100644 --- a/docs/apache-airflow-providers-google/index.rst +++ b/docs/apache-airflow-providers-google/index.rst @@ -211,17 +211,19 @@ This is the extra for the ``google`` provider: .. code-block:: python - extras_require={ - ... - 'apache.beam': ['apache-airflow-providers-apache-beam', 'apache-beam[gcp]'], - .... - }, + extras_require = ( + { + # ... + "apache.beam": ["apache-airflow-providers-apache-beam", "apache-beam[gcp]"], + # ... + }, + ) And likewise this is the extra for the ``apache.beam`` provider: .. code-block:: python - extras_require={'google': ['apache-airflow-providers-google', 'apache-beam[gcp]']}, + extras_require = ({"google": ["apache-airflow-providers-google", "apache-beam[gcp]"]},) You can still run this with PIP version <= 20.2.4 and go back to the previous behaviour: diff --git a/docs/apache-airflow-providers-jdbc/operators.rst b/docs/apache-airflow-providers-jdbc/operators.rst index e0da3ff..ccd8670 100644 --- a/docs/apache-airflow-providers-jdbc/operators.rst +++ b/docs/apache-airflow-providers-jdbc/operators.rst @@ -36,7 +36,7 @@ To use this operator you need: * Install the python module jaydebeapi: .. code-block:: bash - pip install jaydebeapi + pip install apache-airflow[jdbc] * Install a `JVM <https://adoptopenjdk.net/installation.html>`_ and add a ``JAVA_HOME`` env variable. @@ -53,17 +53,17 @@ database is listening for new connections. .. code-block:: python - import apache-airflow[jdbc] - driver_class = "com.exasol.jdbc.EXADriver" driver_path = "/opt/airflow/drivers/exasol/EXASolution_JDBC-7.0.2/exajdbc.jar" - connection_url = "jdbc:exa:localhost" + connection_url = "jdbc:exa:localhost" credentials = ["", ""] - conn = jaydebeapi.connect(driver_class, - connection_url, - credentials, - driver_path,) + conn = jaydebeapi.connect( + driver_class, + connection_url, + credentials, + driver_path, + ) Usage ^^^^^ diff --git a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst index 8d9bd96..e2392e3 100644 --- a/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst +++ b/docs/apache-airflow-providers-postgres/operators/postgres_operator_howto_guide.rst @@ -70,10 +70,10 @@ Now let's refactor ``create_pet_table`` in our DAG: .. code-block:: python create_pet_table = PostgresOperator( - task_id="create_pet_table", - postgres_conn_id="postgres_default", - sql="sql/pet_schema.sql" - ) + task_id="create_pet_table", + postgres_conn_id="postgres_default", + sql="sql/pet_schema.sql", + ) Inserting data into a Postgres database table @@ -94,10 +94,10 @@ We can then create a PostgresOperator task that populate the ``pet`` table. .. code-block:: python populate_pet_table = PostgresOperator( - task_id="populate_pet_table", - postgres_conn_id="postgres_default", - sql="sql/pet_schema.sql" - ) + task_id="populate_pet_table", + postgres_conn_id="postgres_default", + sql="sql/pet_schema.sql", + ) Fetching records from your postgres database table @@ -108,10 +108,10 @@ Fetching records from your postgres database table can be as simple as: .. code-block:: python get_all_pets = PostgresOperator( - task_id="get_all_pets", - postgres_conn_id="postgres_default", - sql="SELECT * FROM pet;" - ) + task_id="get_all_pets", + postgres_conn_id="postgres_default", + sql="SELECT * FROM pet;", + ) @@ -128,14 +128,11 @@ To find the owner of the pet called 'Lester': .. code-block:: python get_birth_date = PostgresOperator( - task_id="get_birth_date", - postgres_conn_id="postgres_default", - sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s", - parameters={ - 'begin_date': '2020-01-01', - 'end_date': '2020-12-31' - } - ) + task_id="get_birth_date", + postgres_conn_id="postgres_default", + sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s", + parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ) Now lets refactor our ``get_birth_date`` task. Instead of dumping SQL statements directly into our code, let's tidy things up by creating a sql file. @@ -151,14 +148,11 @@ class. .. code-block:: python get_birth_date = PostgresOperator( - task_id="get_birth_date", - postgres_conn_id="postgres_default", - sql="sql/birth_date.sql", - params={ - 'begin_date': '2020-01-01', - 'end_date': '2020-12-31' - } - ) + task_id="get_birth_date", + postgres_conn_id="postgres_default", + sql="sql/birth_date.sql", + params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ) The complete Postgres Operator DAG ---------------------------------- diff --git a/docs/apache-airflow-providers/howto/create-update-providers.rst b/docs/apache-airflow-providers/howto/create-update-providers.rst index 91fb74e..f1426d2 100644 --- a/docs/apache-airflow-providers/howto/create-update-providers.rst +++ b/docs/apache-airflow-providers/howto/create-update-providers.rst @@ -196,15 +196,15 @@ any dependency add a empty list. .. code-block:: python PROVIDERS_REQUIREMENTS: Dict[str, List[str]] = { - ... - 'microsoft.winrm': winrm, - 'mongo': mongo, - 'mysql': mysql, - 'neo4j': neo4j, - '<NEW_PROVIDER>': [], - 'odbc': odbc, - ... - } + # ... + "microsoft.winrm": winrm, + "mongo": mongo, + "mysql": mysql, + "neo4j": neo4j, + "<NEW_PROVIDER>": [], + "odbc": odbc, + # ... + } In the ``CONTRIBUTING.rst`` adds: diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index b2ae4ae..9ce345d 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -150,16 +150,17 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write uni from airflow.models import DagBag import unittest + class TestHelloWorldDAG(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.dagbag = DagBag() + @classmethod + def setUpClass(cls): + cls.dagbag = DagBag() - def test_dag_loaded(self): - dag = self.dagbag.get_dag(dag_id='hello_world') - assert self.dagbag.import_errors == {} - assert dag is not None - assert len(dag.tasks) == 1 + def test_dag_loaded(self): + dag = self.dagbag.get_dag(dag_id="hello_world") + assert self.dagbag.import_errors == {} + assert dag is not None + assert len(dag.tasks) == 1 **Unit test a DAG structure:** This is an example test want to verify the structure of a code-generated DAG against a dict object @@ -167,20 +168,26 @@ This is an example test want to verify the structure of a code-generated DAG aga .. code-block:: python import unittest + + class testClass(unittest.TestCase): - def assertDagDictEqual(self,source,dag): + def assertDagDictEqual(self, source, dag): assert dag.task_dict.keys() == source.keys() for task_id, downstream_list in source.items(): assert dag.has_task(task_id) task = dag.get_task(task_id) assert task.downstream_task_ids == set(downstream_list) + def test_dag(self): - self.assertDagDictEqual({ - "DummyInstruction_0": ["DummyInstruction_1"], - "DummyInstruction_1": ["DummyInstruction_2"], - "DummyInstruction_2": ["DummyInstruction_3"], - "DummyInstruction_3": [] - },dag) + self.assertDagDictEqual( + { + "DummyInstruction_0": ["DummyInstruction_1"], + "DummyInstruction_1": ["DummyInstruction_2"], + "DummyInstruction_2": ["DummyInstruction_3"], + "DummyInstruction_3": [], + }, + dag, + ) **Unit test for custom operator:** @@ -189,23 +196,28 @@ This is an example test want to verify the structure of a code-generated DAG aga import unittest from airflow.utils.state import State - DEFAULT_DATE = '2019-10-03' - TEST_DAG_ID = 'test_my_custom_operator' + DEFAULT_DATE = "2019-10-03" + TEST_DAG_ID = "test_my_custom_operator" + class MyCustomOperatorTest(unittest.TestCase): - def setUp(self): - self.dag = DAG(TEST_DAG_ID, schedule_interval='@daily', default_args={'start_date' : DEFAULT_DATE}) - self.op = MyCustomOperator( - dag=self.dag, - task_id='test', - prefix='s3://bucket/some/prefix', - ) - self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE) - - def test_execute_no_trigger(self): - self.ti.run(ignore_ti_state=True) - assert self.ti.state == State.SUCCESS - # Assert something related to tasks results + def setUp(self): + self.dag = DAG( + TEST_DAG_ID, + schedule_interval="@daily", + default_args={"start_date": DEFAULT_DATE}, + ) + self.op = MyCustomOperator( + dag=self.dag, + task_id="test", + prefix="s3://bucket/some/prefix", + ) + self.ti = TaskInstance(task=self.op, execution_date=DEFAULT_DATE) + + def test_execute_no_trigger(self): + self.ti.run(ignore_ti_state=True) + assert self.ti.state == State.SUCCESS + # Assert something related to tasks results Self-Checks ------------ @@ -221,10 +233,10 @@ Similarly, if you have a task that starts a microservice in Kubernetes or Mesos, task = PushToS3(...) check = S3KeySensor( - task_id='check_parquet_exists', - bucket_key="s3://bucket/key/foo.parquet", - poke_interval=0, - timeout=0 + task_id="check_parquet_exists", + bucket_key="s3://bucket/key/foo.parquet", + poke_interval=0, + timeout=0, ) task >> check @@ -243,10 +255,7 @@ You can use environment variables to parameterize the DAG. import os - dest = os.environ.get( - "MY_DAG_DEST_PATH", - "s3://default-target/path/" - ) + dest = os.environ.get("MY_DAG_DEST_PATH", "s3://default-target/path/") Mocking variables and connections ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -257,7 +266,7 @@ For variable, use :envvar:`AIRFLOW_VAR_{KEY}`. .. code-block:: python - with mock.patch.dict('os.environ', AIRFLOW_VAR_KEY="env-value"): + with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"): assert "env-value" == Variable.get("key") For connection, use :envvar:`AIRFLOW_CONN_{CONN_ID}`. @@ -271,4 +280,4 @@ For connection, use :envvar:`AIRFLOW_CONN_{CONN_ID}`. ) conn_uri = conn.get_uri() with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri): - assert "cat" == Connection.get("my_conn").login + assert "cat" == Connection.get("my_conn").login diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index 593eba6..76b9b23 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -243,27 +243,28 @@ The ``BranchPythonOperator`` can also be used with XComs allowing branching cont .. code-block:: python def branch_func(ti): - xcom_value = int(ti.xcom_pull(task_ids='start_task')) + xcom_value = int(ti.xcom_pull(task_ids="start_task")) if xcom_value >= 5: - return 'continue_task' + return "continue_task" else: - return 'stop_task' + return "stop_task" + start_op = BashOperator( - task_id='start_task', + task_id="start_task", bash_command="echo 5", xcom_push=True, dag=dag, ) branch_op = BranchPythonOperator( - task_id='branch_task', + task_id="branch_task", python_callable=branch_func, dag=dag, ) - continue_op = DummyOperator(task_id='continue_task', dag=dag) - stop_op = DummyOperator(task_id='stop_task', dag=dag) + continue_op = DummyOperator(task_id="continue_task", dag=dag) + stop_op = DummyOperator(task_id="stop_task", dag=dag) start_op >> branch_op >> [continue_op, stop_op] @@ -348,7 +349,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality .. code-block:: python - #dags/branch_without_trigger.py + # dags/branch_without_trigger.py import datetime as dt from airflow.models import DAG @@ -356,23 +357,22 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality from airflow.operators.python import BranchPythonOperator dag = DAG( - dag_id='branch_without_trigger', - schedule_interval='@once', - start_date=dt.datetime(2019, 2, 28) + dag_id="branch_without_trigger", + schedule_interval="@once", + start_date=dt.datetime(2019, 2, 28), ) - run_this_first = DummyOperator(task_id='run_this_first', dag=dag) + run_this_first = DummyOperator(task_id="run_this_first", dag=dag) branching = BranchPythonOperator( - task_id='branching', dag=dag, - python_callable=lambda: 'branch_a' + task_id="branching", dag=dag, python_callable=lambda: "branch_a" ) - branch_a = DummyOperator(task_id='branch_a', dag=dag) - follow_branch_a = DummyOperator(task_id='follow_branch_a', dag=dag) + branch_a = DummyOperator(task_id="branch_a", dag=dag) + follow_branch_a = DummyOperator(task_id="follow_branch_a", dag=dag) - branch_false = DummyOperator(task_id='branch_false', dag=dag) + branch_false = DummyOperator(task_id="branch_false", dag=dag) - join = DummyOperator(task_id='join', dag=dag) + join = DummyOperator(task_id="join", dag=dag) run_this_first >> branching branching >> branch_a >> follow_branch_a >> join @@ -462,6 +462,7 @@ To add labels, you can use them directly inline with the ``>>`` and ``<<`` opera .. code-block:: python from airflow.utils.edgemodifier import Label + my_task >> Label("When empty") >> other_task Or, you can pass a Label object to ``set_upstream``/``set_downstream``: @@ -469,6 +470,7 @@ Or, you can pass a Label object to ``set_upstream``/``set_downstream``: .. code-block:: python from airflow.utils.edgemodifier import Label + my_task.set_downstream(other_task, Label("When empty")) Here's an example DAG which illustrates labeling different branches: @@ -507,7 +509,7 @@ This is especially useful if your tasks are built dynamically from configuration ### My great DAG """ - dag = DAG('my_dag', default_args=default_args) + dag = DAG("my_dag", default_args=default_args) dag.doc_md = __doc__ t = BashOperator("foo", dag=dag) diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 6c339e9..8d9473f 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -67,10 +67,11 @@ For example, say you want to pass the execution date as an environment variable # The execution date as YYYY-MM-DD date = "{{ ds }}" t = BashOperator( - task_id='test_env', - bash_command='/tmp/test.sh ', + task_id="test_env", + bash_command="/tmp/test.sh ", dag=dag, - env={'EXECUTION_DATE': date}) + env={"EXECUTION_DATE": date}, + ) Here, ``{{ ds }}`` is a macro, and because the ``env`` parameter of the ``BashOperator`` is templated with Jinja, the execution date will be available as an environment variable named ``EXECUTION_DATE`` in your Bash script. @@ -81,19 +82,18 @@ You can also use Jinja templating with nested fields, as long as these nested fi .. code-block:: python class MyDataReader: - template_fields = ['path'] + template_fields = ["path"] def __init__(self, my_path): self.path = my_path # [additional code here...] + t = PythonOperator( - task_id='transform_data', - python_callable=transform_data - op_args=[ - MyDataReader('/tmp/{{ ds }}/my_file') - ], + task_id="transform_data", + python_callable=transform_data, + op_args=[MyDataReader("/tmp/{{ ds }}/my_file")], dag=dag, ) @@ -104,27 +104,27 @@ Deep nested fields can also be substituted, as long as all intermediate fields a .. code-block:: python class MyDataTransformer: - template_fields = ['reader'] + template_fields = ["reader"] def __init__(self, my_reader): self.reader = my_reader # [additional code here...] + class MyDataReader: - template_fields = ['path'] + template_fields = ["path"] def __init__(self, my_path): self.path = my_path # [additional code here...] + t = PythonOperator( - task_id='transform_data', - python_callable=transform_data - op_args=[ - MyDataTransformer(MyDataReader('/tmp/{{ ds }}/my_file')) - ], + task_id="transform_data", + python_callable=transform_data, + op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))], dag=dag, ) @@ -133,10 +133,10 @@ You can pass custom options to the Jinja ``Environment`` when creating your DAG. .. code-block:: python my_dag = DAG( - dag_id='my-dag', + dag_id="my-dag", jinja_environment_kwargs={ - 'keep_trailing_newline': True, - # some other jinja2 Environment options here + "keep_trailing_newline": True, + # some other jinja2 Environment options here }, ) @@ -155,8 +155,9 @@ Now, when the following task is run, ``order_data`` argument is passed a string, .. code-block:: python transform = PythonOperator( - task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"}, - python_callable=transform + task_id="transform", + op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"}, + python_callable=transform, ) @@ -172,24 +173,25 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows: render_template_as_native_obj=True, ) + def extract(): data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' return json.loads(data_string) + def transform(order_data): print(type(order_data)) for value in order_data.values(): total_order_value += value return {"total_order_value": total_order_value} - extract_task = PythonOperator( - task_id="extract", - python_callable=extract - ) + + extract_task = PythonOperator(task_id="extract", python_callable=extract) transform_task = PythonOperator( - task_id="transform", op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"}, - python_callable=transform + task_id="transform", + op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"}, + python_callable=transform, ) extract_task >> transform_task diff --git a/docs/apache-airflow/concepts/pools.rst b/docs/apache-airflow/concepts/pools.rst index 5b10fdd..f6c97d7 100644 --- a/docs/apache-airflow/concepts/pools.rst +++ b/docs/apache-airflow/concepts/pools.rst @@ -29,9 +29,9 @@ Tasks can then be associated with one of the existing pools by using the ``pool` .. code-block:: python aggregate_db_message_job = BashOperator( - task_id='aggregate_db_message_job', + task_id="aggregate_db_message_job", execution_timeout=timedelta(hours=3), - pool='ep_data_pipeline_db_msg_agg', + pool="ep_data_pipeline_db_msg_agg", bash_command=aggregate_db_message_job_cmd, dag=dag, ) diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 0752990..0ac9f37 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -96,22 +96,23 @@ in the configuration file. When turned off, the scheduler creates a DAG run only default_args = { - 'owner': 'airflow', - 'depends_on_past': False, - 'email': ['airf...@example.com'], - 'email_on_failure': False, - 'email_on_retry': False, - 'retries': 1, - 'retry_delay': timedelta(minutes=5) + "owner": "airflow", + "depends_on_past": False, + "email": ["airf...@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), } dag = DAG( - 'tutorial', + "tutorial", default_args=default_args, start_date=datetime(2015, 12, 1), - description='A simple tutorial DAG', - schedule_interval='@daily', - catchup=False) + description="A simple tutorial DAG", + schedule_interval="@daily", + catchup=False, + ) In the example above, if the DAG is picked up by the scheduler daemon on 2016-01-02 at 6 AM, (or from the command line), a single DAG Run will be created, with an `execution_date` of 2016-01-01, @@ -214,7 +215,7 @@ Example of a parameterized DAG: dag = DAG("example_parameterized_dag", schedule_interval=None, start_date=days_ago(2)) parameterized_task = BashOperator( - task_id='parameterized_task', + task_id="parameterized_task", bash_command="echo value: {{ dag_run.conf['conf1'] }}", dag=dag, ) diff --git a/docs/apache-airflow/dag-serialization.rst b/docs/apache-airflow/dag-serialization.rst index 72b8f37..842ab9d 100644 --- a/docs/apache-airflow/dag-serialization.rst +++ b/docs/apache-airflow/dag-serialization.rst @@ -114,4 +114,5 @@ define a ``json`` variable in local Airflow settings (``airflow_local_settings.p .. code-block:: python import ujson + json = ujson diff --git a/docs/apache-airflow/executor/debug.rst b/docs/apache-airflow/executor/debug.rst index c029826..e956e11 100644 --- a/docs/apache-airflow/executor/debug.rst +++ b/docs/apache-airflow/executor/debug.rst @@ -42,10 +42,11 @@ It will run a backfill job: .. code-block:: python - if __name__ == '__main__': - from airflow.utils.state import State - dag.clear(dag_run_state=State.NONE) - dag.run() + if __name__ == "__main__": + from airflow.utils.state import State + + dag.clear(dag_run_state=State.NONE) + dag.run() 2. Setup ``AIRFLOW__CORE__EXECUTOR=DebugExecutor`` in run configuration of your IDE. In diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 5122312..b9f6ccb 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -180,9 +180,8 @@ fields. They are also included in the context dictionary given to an Operator's .. code-block:: python class MyOperator(BaseOperator): - def execute(self, context): - logging.info(context['execution_date']) + logging.info(context["execution_date"]) Note that ``ds`` refers to date_string, not date start as may be confusing to some. @@ -208,11 +207,11 @@ simple dictionary. for i in range(10): - dag_id = f'foo_{i}' + dag_id = f"foo_{i}" globals()[dag_id] = DAG(dag_id) # or better, call a function that returns a DAG object! - other_dag_id = f'bar_{i}' + other_dag_id = f"bar_{i}" globals()[other_dag_id] = create_dag(other_dag_id) Even though Airflow supports multiple DAG definition per python file, dynamically generated or otherwise, it is not @@ -244,27 +243,17 @@ commonly attempted in ``user_defined_macros``. .. code-block:: python dag = DAG( - ... - user_defined_macros={ - 'my_custom_macro': 'day={{ ds }}' - } + # ... + user_defined_macros={"my_custom_macro": "day={{ ds }}"} ) - bo = BashOperator( - task_id='my_task', - bash_command="echo {{ my_custom_macro }}", - dag=dag - ) + bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag) This will echo "day={{ ds }}" instead of "day=2020-01-01" for a dagrun with the execution date 2020-01-01 00:00:00. .. code-block:: python - bo = BashOperator( - task_id='my_task', - bash_command="echo day={{ ds }}", - dag=dag - ) + bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag) By using the ds macros directly in the template_field, the rendered value results in "day=2020-01-01". @@ -318,16 +307,15 @@ upstream task. def b_func(): pass - @dag( - schedule_interval='@once', - start_date=datetime(2021, 1, 1) - ) + + @dag(schedule_interval="@once", start_date=datetime(2021, 1, 1)) def my_dag(): a = a_func() b = b_func() a >> b + dag = my_dag() See :ref:`concepts:trigger-rules` for more information. diff --git a/docs/apache-airflow/howto/add-dag-tags.rst b/docs/apache-airflow/howto/add-dag-tags.rst index 02d7c07..236414a 100644 --- a/docs/apache-airflow/howto/add-dag-tags.rst +++ b/docs/apache-airflow/howto/add-dag-tags.rst @@ -32,11 +32,7 @@ In your Dag file, pass a list of tags you want to add to DAG object: .. code-block:: python - dag = DAG( - dag_id='example_dag_tag', - schedule_interval='0 0 * * *', - tags=['example'] - ) + dag = DAG(dag_id="example_dag_tag", schedule_interval="0 0 * * *", tags=["example"]) **Screenshot**: diff --git a/docs/apache-airflow/howto/connection.rst b/docs/apache-airflow/howto/connection.rst index 09aa294..2b8afd2 100644 --- a/docs/apache-airflow/howto/connection.rst +++ b/docs/apache-airflow/howto/connection.rst @@ -221,15 +221,15 @@ The above URI would produce a ``Connection`` object equivalent to the following: .. code-block:: python Connection( - conn_id='', - conn_type='my_conn_type', + conn_id="", + conn_type="my_conn_type", description=None, - login='my-login', - password='my-password', - host='my-host', + login="my-login", + password="my-password", + host="my-host", port=5432, - schema='my-schema', - extra=json.dumps(dict(param1='val1', param2='val2')) + schema="my-schema", + extra=json.dumps(dict(param1="val1", param2="val2")), ) @@ -245,16 +245,15 @@ convenience method :py:meth:`~airflow.models.connection.Connection.get_uri`. It >>> import json >>> from airflow.models.connection import Connection - >>> c = Connection( - >>> conn_id='some_conn', - >>> conn_type='mysql', - >>> description='connection description', - >>> host='myhost.com', - >>> login='myname', - >>> password='mypassword', - >>> extra=json.dumps(dict(this_param='some val', that_param='other val*')), - >>> ) + ... conn_id="some_conn", + ... conn_type="mysql", + ... description="connection description", + ... host="myhost.com", + ... login="myname", + ... password="mypassword", + ... extra=json.dumps(dict(this_param="some val", that_param="other val*")), + ... ) >>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'") AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassw...@myhost.com?this_param=some+val&that_param=other+val%2A' @@ -288,16 +287,16 @@ For example: .. code-block:: pycon - >>> extra_dict = {'my_val': ['list', 'of', 'values'], 'extra': {'nested': {'json': 'val'}}} + >>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}} >>> c = Connection( - >>> conn_type='scheme', - >>> host='host/location', - >>> schema='schema', - >>> login='user', - >>> password='password', - >>> port=1234, - >>> extra=json.dumps(extra_dict), - >>> ) + ... conn_type="scheme", + ... host="host/location", + ... schema="schema", + ... login="user", + ... password="password", + ... port=1234, + ... extra=json.dumps(extra_dict), + ... ) >>> uri = c.get_uri() >>> uri 'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D' @@ -320,7 +319,9 @@ You can verify a URI is parsed correctly like so: >>> from airflow.models.connection import Connection - >>> c = Connection(uri='my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2') + >>> c = Connection( + ... uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2" + ... ) >>> print(c.login) my-login >>> print(c.password) @@ -342,14 +343,18 @@ For example if your password has a ``/``, this fails: .. code-block:: pycon - >>> c = Connection(uri='my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2') + >>> c = Connection( + ... uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2" + ... ) ValueError: invalid literal for int() with base 10: 'my-pa' To fix this, you can encode with :func:`~urllib.parse.quote_plus`: .. code-block:: pycon - >>> c = Connection(uri='my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2') + >>> c = Connection( + ... uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2" + ... ) >>> print(c.password) my-pa/ssword diff --git a/docs/apache-airflow/howto/custom-operator.rst b/docs/apache-airflow/howto/custom-operator.rst index 648ef73..091f076 100644 --- a/docs/apache-airflow/howto/custom-operator.rst +++ b/docs/apache-airflow/howto/custom-operator.rst @@ -46,12 +46,9 @@ Let's implement an example ``HelloOperator`` in a new file ``hello_operator.py`` from airflow.models.baseoperator import BaseOperator - class HelloOperator(BaseOperator): - def __init__( - self, - name: str, - **kwargs) -> None: + class HelloOperator(BaseOperator): + def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name @@ -75,7 +72,7 @@ You can now use the derived custom operator as follows: from custom_operator.hello_operator import HelloOperator with dag: - hello_task = HelloOperator(task_id='sample-task', name='foo_bar') + hello_task = HelloOperator(task_id="sample-task", name="foo_bar") You also can keep using your plugins folder for storing your custom operators. If you have the file ``hello_operator.py`` within the plugins folder, you can import the operator as follows: @@ -110,26 +107,19 @@ Let's extend our previous example to fetch name from MySQL: .. code-block:: python class HelloDBOperator(BaseOperator): - - def __init__( - self, - name: str, - mysql_conn_id: str, - database: str, - **kwargs) -> None: - super().__init__(**kwargs) - self.name = name - self.mysql_conn_id = mysql_conn_id - self.database = database - - def execute(self, context): - hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, - schema=self.database) - sql = "select name from user" - result = hook.get_first(sql) - message = "Hello {}".format(result['name']) - print(message) - return message + def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None: + super().__init__(**kwargs) + self.name = name + self.mysql_conn_id = mysql_conn_id + self.database = database + + def execute(self, context): + hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database) + sql = "select name from user" + result = hook.get_first(sql) + message = "Hello {}".format(result["name"]) + print(message) + return message When the operator invokes the query on the hook object, a new connection gets created if it doesn't exist. The hook retrieves the auth parameters such as username and password from Airflow @@ -149,9 +139,9 @@ Override ``ui_fgcolor`` to change the color of the label. .. code-block:: python class HelloOperator(BaseOperator): - ui_color = '#ff0000' - ui_fgcolor = '#000000' - .... + ui_color = "#ff0000" + ui_fgcolor = "#000000" + # ... Templating ^^^^^^^^^^^ @@ -163,12 +153,9 @@ the operator. class HelloOperator(BaseOperator): - template_fields = ['name'] + template_fields = ["name"] - def __init__( - self, - name: str, - **kwargs) -> None: + def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name @@ -182,7 +169,9 @@ You can use the template as follows: .. code-block:: python with dag: - hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_instance.task_id }}') + hello_task = HelloOperator( + task_id="task_id_1", dag=dag, name="{{ task_instance.task_id }}" + ) In this example, Jinja looks for the ``name`` parameter and substitutes ``{{ task_instance.task_id }}`` with ``task_id_1``. @@ -197,13 +186,10 @@ with actual value. Note that Jinja substitutes the operator attributes and not t class HelloOperator(BaseOperator): - template_fields = ['guest_name'] - template_ext = ['.sql'] + template_fields = ["guest_name"] + template_ext = [".sql"] - def __init__( - self, - name: str, - **kwargs) -> None: + def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.guest_name = name @@ -215,13 +201,10 @@ from template field renders in Web UI. For example: .. code-block:: python class MyRequestOperator(BaseOperator): - template_fields = ['request_body'] - template_fields_renderers = {'request_body': 'json'} + template_fields = ["request_body"] + template_fields_renderers = {"request_body": "json"} - def __init__( - self, - request_body: str, - **kwargs) -> None: + def __init__(self, request_body: str, **kwargs) -> None: super().__init__(**kwargs) self.request_body = request_body diff --git a/docs/apache-airflow/howto/customize-state-colors-ui.rst b/docs/apache-airflow/howto/customize-state-colors-ui.rst index ae7827c..db9a919 100644 --- a/docs/apache-airflow/howto/customize-state-colors-ui.rst +++ b/docs/apache-airflow/howto/customize-state-colors-ui.rst @@ -33,15 +33,15 @@ following steps: .. code-block:: python STATE_COLORS = { - "queued": 'darkgray', - "running": '#01FF70', - "success": '#2ECC40', - "failed": 'firebrick', - "up_for_retry": 'yellow', - "up_for_reschedule": 'turquoise', - "upstream_failed": 'orange', - "skipped": 'darkorchid', - "scheduled": 'tan', + "queued": "darkgray", + "running": "#01FF70", + "success": "#2ECC40", + "failed": "firebrick", + "up_for_retry": "yellow", + "up_for_reschedule": "turquoise", + "upstream_failed": "orange", + "skipped": "darkorchid", + "scheduled": "tan", } diff --git a/docs/apache-airflow/howto/define_extra_link.rst b/docs/apache-airflow/howto/define_extra_link.rst index cf7aba8..2a89323 100644 --- a/docs/apache-airflow/howto/define_extra_link.rst +++ b/docs/apache-airflow/howto/define_extra_link.rst @@ -36,16 +36,15 @@ The following code shows how to add extra links to an operator via Plugins: class GoogleLink(BaseOperatorLink): - name = 'Google' + name = "Google" def get_link(self, operator, dttm): return "https://www.google.com" + class MyFirstOperator(BaseOperator): - operator_extra_links = ( - GoogleLink(), - ) + operator_extra_links = (GoogleLink(),) def __init__(self, **kwargs): super().__init__(**kwargs) @@ -53,10 +52,13 @@ The following code shows how to add extra links to an operator via Plugins: def execute(self, context): self.log.info("Hello World!") + # Defining the plugin class class AirflowExtraLinkPlugin(AirflowPlugin): name = "extra_link_plugin" - operator_extra_links = [GoogleLink(), ] + operator_extra_links = [ + GoogleLink(), + ] .. note:: Operator Extra Links should be registered via Airflow Plugins or custom Airflow Provider to work. @@ -83,24 +85,28 @@ tasks using :class:`~airflow.providers.amazon.aws.transfers.gcs_to_s3.GCSToS3Ope from airflow.models.baseoperator import BaseOperatorLink from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator + class S3LogLink(BaseOperatorLink): - name = 'S3' + name = "S3" # Add list of all the operators to which you want to add this OperatorLinks # Example: operators = [GCSToS3Operator, GCSToBigQueryOperator] operators = [GCSToS3Operator] def get_link(self, operator, dttm): - return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format( + return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format( dag_id=operator.dag_id, task_id=operator.task_id, execution_date=dttm, ) + # Defining the plugin class class AirflowExtraLinkPlugin(AirflowPlugin): name = "extra_link_plugin" - operator_extra_links = [S3LogLink(), ] + operator_extra_links = [ + S3LogLink(), + ] @@ -117,25 +123,29 @@ Console, but if we wanted to change that link we could: from airflow.providers.google.cloud.operators.bigquery import BigQueryOperator # Change from https to http just to display the override - BIGQUERY_JOB_DETAILS_LINK_FMT = 'http://console.cloud.google.com/bigquery?j={job_id}' + BIGQUERY_JOB_DETAILS_LINK_FMT = "http://console.cloud.google.com/bigquery?j={job_id}" class BigQueryConsoleLink(BaseOperatorLink): """ Helper class for constructing BigQuery link. """ - name = 'BigQuery Console' + + name = "BigQuery Console" operators = [BigQueryOperator] def get_link(self, operator, dttm): ti = TaskInstance(task=operator, execution_date=dttm) - job_id = ti.xcom_pull(task_ids=operator.task_id, key='job_id') - return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else '' + job_id = ti.xcom_pull(task_ids=operator.task_id, key="job_id") + return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id) if job_id else "" + # Defining the plugin class class AirflowExtraLinkPlugin(AirflowPlugin): name = "extra_link_plugin" - operator_extra_links = [BigQueryConsoleLink(), ] + operator_extra_links = [ + BigQueryConsoleLink(), + ] **Adding Operator Links via Providers** diff --git a/docs/apache-airflow/howto/operator/bash.rst b/docs/apache-airflow/howto/operator/bash.rst index debdd26..6af6be6 100644 --- a/docs/apache-airflow/howto/operator/bash.rst +++ b/docs/apache-airflow/howto/operator/bash.rst @@ -70,8 +70,8 @@ inside the bash_command, as below: bash_task = BashOperator( task_id="bash_task", - bash_command='echo "here is the message: \'$message\'"', - env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, + bash_command="echo \"here is the message: '$message'\"", + env={"message": '{{ dag_run.conf["message"] if dag_run else "" }}'}, ) Skipping @@ -100,14 +100,13 @@ template to it, which will fail. .. code-block:: python t2 = BashOperator( - task_id='bash_example', - + task_id="bash_example", # This fails with 'Jinja template not found' error # bash_command="/home/batcher/test.sh", - # This works (has a space after) bash_command="/home/batcher/test.sh ", - dag=dag) + dag=dag, + ) However, if you want to use templating in your bash script, do not add the space and instead put your bash script in a location relative to the directory containing @@ -119,10 +118,11 @@ as shown below: .. code-block:: python t2 = BashOperator( - task_id='bash_example', + task_id="bash_example", # "scripts" folder is under "/usr/local/airflow/dags" bash_command="scripts/test.sh", - dag=dag) + dag=dag, + ) Creating separate folder for bash scripts may be desirable for many reasons, like separating your script's logic and pipeline code, allowing for proper code highlighting @@ -138,7 +138,8 @@ Example: dag = DAG("example_bash_dag", template_searchpath="/opt/scripts") t2 = BashOperator( - task_id='bash_example', + task_id="bash_example", # "test.sh" is a file under "/opt/scripts" bash_command="test.sh ", - dag=dag) + dag=dag, + ) diff --git a/docs/apache-airflow/howto/variable.rst b/docs/apache-airflow/howto/variable.rst index 3ea613b..7cb9377 100644 --- a/docs/apache-airflow/howto/variable.rst +++ b/docs/apache-airflow/howto/variable.rst @@ -53,6 +53,7 @@ You can use them in your DAGs as: .. code-block:: python from airflow.models import Variable + foo = Variable.get("foo") foo_json = Variable.get("foo_baz", deserialize_json=True) diff --git a/docs/apache-airflow/kubernetes.rst b/docs/apache-airflow/kubernetes.rst index 873fb00..b88a83d 100644 --- a/docs/apache-airflow/kubernetes.rst +++ b/docs/apache-airflow/kubernetes.rst @@ -57,5 +57,6 @@ to every worker pod launched by KubernetesExecutor or KubernetesPodOperator. from kubernetes.client.models import V1Pod + def pod_mutation_hook(pod: V1Pod): - pod.metadata.annotations['airflow.apache.org/launched-by'] = 'Tests' + pod.metadata.annotations["airflow.apache.org/launched-by"] = "Tests" diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index 362d3e6..227a386 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -40,20 +40,19 @@ works. FILE_CATEGORIES = ["CAT1", "CAT2", "CAT3"] - args = { - 'owner': 'airflow', - 'start_date': days_ago(2) - } + args = {"owner": "airflow", "start_date": days_ago(2)} dag = DAG( - dag_id='example_lineage', default_args=args, - schedule_interval='0 0 * * *', - dagrun_timeout=timedelta(minutes=60)) + dag_id="example_lineage", + default_args=args, + schedule_interval="0 0 * * *", + dagrun_timeout=timedelta(minutes=60), + ) f_final = File(url="/tmp/final") - run_this_last = DummyOperator(task_id='run_this_last', dag=dag, - inlets=AUTO, - outlets=f_final) + run_this_last = DummyOperator( + task_id="run_this_last", dag=dag, inlets=AUTO, outlets=f_final + ) f_in = File(url="/tmp/whole_directory/") outlets = [] @@ -62,9 +61,7 @@ works. outlets.append(f_out) run_this = BashOperator( - task_id='run_me_first', bash_command='echo 1', dag=dag, - inlets=f_in, - outlets=outlets + task_id="run_me_first", bash_command="echo 1", dag=dag, inlets=f_in, outlets=outlets ) run_this.set_downstream(run_this_last) @@ -113,6 +110,8 @@ The backend should inherit from ``airflow.lineage.LineageBackend``. from airflow.lineage.backend import LineageBackend + class ExampleBackend(LineageBackend): - def send_lineage(self, operator, inlets=None, outlets=None, context=None): - # Send the info to some external service + def send_lineage(self, operator, inlets=None, outlets=None, context=None): + ... + # Send the info to some external service diff --git a/docs/apache-airflow/modules_management.rst b/docs/apache-airflow/modules_management.rst index abe3cf0..bb71b6f 100644 --- a/docs/apache-airflow/modules_management.rst +++ b/docs/apache-airflow/modules_management.rst @@ -106,7 +106,7 @@ When we import this package, it should print the above message. import setuptools setuptools.setup( - name='airflow_operators', + name="airflow_operators", ) 5. Build the wheel: diff --git a/docs/apache-airflow/plugins.rst b/docs/apache-airflow/plugins.rst index 6f8f9a4..d59659e 100644 --- a/docs/apache-airflow/plugins.rst +++ b/docs/apache-airflow/plugins.rst @@ -122,8 +122,8 @@ looks like: # to protect against extra parameters injected into the on_load(...) # function in future changes def on_load(*args, **kwargs): - # ... perform Plugin boot actions - pass + # ... perform Plugin boot actions + pass # A list of global operator extra links that can redirect users to # external systems. These extra links will be available on the @@ -133,7 +133,6 @@ looks like: # operator level. global_operator_extra_links = [] - # A list of operator extra links to override or add operator links # to existing Airflow Operators. # These extra links will be available on the task page in form of @@ -172,17 +171,21 @@ definitions in Airflow. class PluginHook(BaseHook): pass + # Will show up under airflow.macros.test_plugin.plugin_macro # and in templates through {{ macros.test_plugin.plugin_macro }} def plugin_macro(): pass + # Creating a flask blueprint to integrate the templates and static folder bp = Blueprint( - "test_plugin", __name__, - template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder - static_folder='static', - static_url_path='/static/test_plugin') + "test_plugin", + __name__, + template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder + static_folder="static", + static_url_path="/static/test_plugin", + ) # Creating a flask appbuilder BaseView class TestAppBuilderBaseView(AppBuilderBaseView): @@ -192,6 +195,7 @@ definitions in Airflow. def test(self): return self.render_template("test_plugin/test.html", content="Hello galaxy!") + # Creating a flask appbuilder BaseView class TestAppBuilderBaseNoMenuView(AppBuilderBaseView): default_view = "test" @@ -200,15 +204,16 @@ definitions in Airflow. def test(self): return self.render_template("test_plugin/test.html", content="Hello galaxy!") + v_appbuilder_view = TestAppBuilderBaseView() - v_appbuilder_package = {"name": "Test View", - "category": "Test Plugin", - "view": v_appbuilder_view} + v_appbuilder_package = { + "name": "Test View", + "category": "Test Plugin", + "view": v_appbuilder_view, + } v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView() - v_appbuilder_nomenu_package = { - "view": v_appbuilder_nomenu_view - } + v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view} # Creating flask appbuilder Menu Items appbuilder_mitem = { @@ -229,16 +234,17 @@ definitions in Airflow. def get_link(self, operator, dttm): return "https://www.google.com" + # A list of operator extra links to override or add operator links # to existing Airflow Operators. # These extra links will be available on the task page in form of # buttons. class S3LogLink(BaseOperatorLink): - name = 'S3' + name = "S3" operators = [GCSToS3Operator] def get_link(self, operator, dttm): - return 'https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}'.format( + return "https://s3.amazonaws.com/airflow-logs/{dag_id}/{task_id}/{execution_date}".format( dag_id=operator.dag_id, task_id=operator.task_id, execution_date=dttm, @@ -253,8 +259,12 @@ definitions in Airflow. flask_blueprints = [bp] appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package] appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel] - global_operator_extra_links = [GoogleLink(),] - operator_extra_links = [S3LogLink(), ] + global_operator_extra_links = [ + GoogleLink(), + ] + operator_extra_links = [ + S3LogLink(), + ] Note on role based views @@ -277,10 +287,11 @@ some views using a decorator. from airflow.www.app import csrf + @csrf.exempt def my_handler(): # ... - return 'ok' + return "ok" Plugins as Python packages -------------------------- @@ -302,14 +313,17 @@ will automatically load the registered plugins from the entrypoint list. # Creating a flask blueprint to integrate the templates and static folder bp = Blueprint( - "test_plugin", __name__, - template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder - static_folder='static', - static_url_path='/static/test_plugin') + "test_plugin", + __name__, + template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder + static_folder="static", + static_url_path="/static/test_plugin", + ) + class MyAirflowPlugin(AirflowPlugin): - name = 'my_namespace' - flask_blueprints = [bp] + name = "my_namespace" + flask_blueprints = [bp] .. code-block:: python @@ -317,12 +331,10 @@ will automatically load the registered plugins from the entrypoint list. setup( name="my-package", - ... - entry_points = { - 'airflow.plugins': [ - 'my_plugin = my_package.my_plugin:MyAirflowPlugin' - ] - } + # ... + entry_points={ + "airflow.plugins": ["my_plugin = my_package.my_plugin:MyAirflowPlugin"] + }, ) Automatic reloading webserver diff --git a/docs/apache-airflow/security/secrets/fernet.rst b/docs/apache-airflow/security/secrets/fernet.rst index 3ecbd0d..28c06fe 100644 --- a/docs/apache-airflow/security/secrets/fernet.rst +++ b/docs/apache-airflow/security/secrets/fernet.rst @@ -43,8 +43,9 @@ If you need to generate a new fernet key you can use the following code snippet. .. code-block:: python from cryptography.fernet import Fernet - fernet_key= Fernet.generate_key() - print(fernet_key.decode()) # your fernet_key, keep it in secured place! + + fernet_key = Fernet.generate_key() + print(fernet_key.decode()) # your fernet_key, keep it in secured place! Rotating encryption keys diff --git a/docs/apache-airflow/security/secrets/index.rst b/docs/apache-airflow/security/secrets/index.rst index fd5bbd0..96d576b 100644 --- a/docs/apache-airflow/security/secrets/index.rst +++ b/docs/apache-airflow/security/secrets/index.rst @@ -78,6 +78,7 @@ your DAG file or operator's ``execute`` function using the ``mask_secret`` funct @task def my_func(): from airflow.utils.log.secrets_masker import mask_secret + mask_secret("custom_value") ... @@ -88,9 +89,9 @@ or class MyOperator(BaseOperator): - def execute(self, context): from airflow.utils.log.secrets_masker import mask_secret + mask_secret("custom_value") ... diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst index 63d1cec..449393c 100644 --- a/docs/apache-airflow/timezone.rst +++ b/docs/apache-airflow/timezone.rst @@ -75,7 +75,7 @@ Because Airflow uses time-zone-aware datetime objects. If your code creates date from airflow.utils import timezone now = timezone.utcnow() - a_date = timezone.datetime(2017,1,1) + a_date = timezone.datetime(2017, 1, 1) Interpretation of naive datetime objects @@ -90,14 +90,11 @@ words if you have a default time zone setting of ``Europe/Amsterdam`` and create .. code-block:: python - default_args=dict( - start_date=datetime(2016, 1, 1), - owner='airflow' - ) + default_args = dict(start_date=datetime(2016, 1, 1), owner="airflow") - dag = DAG('my_dag', default_args=default_args) - op = DummyOperator(task_id='dummy', dag=dag) - print(op.owner) # Airflow + dag = DAG("my_dag", default_args=default_args) + op = DummyOperator(task_id="dummy", dag=dag) + print(op.owner) # Airflow Unfortunately, during DST transitions, some datetimes don’t exist or are ambiguous. In such situations, pendulum raises an exception. That’s why you should always create aware @@ -137,14 +134,11 @@ using ``pendulum``. local_tz = pendulum.timezone("Europe/Amsterdam") - default_args=dict( - start_date=datetime(2016, 1, 1, tzinfo=local_tz), - owner='airflow' - ) + default_args = dict(start_date=datetime(2016, 1, 1, tzinfo=local_tz), owner="airflow") - dag = DAG('my_tz_dag', default_args=default_args) - op = DummyOperator(task_id='dummy', dag=dag) - print(dag.timezone) # <Timezone [Europe/Amsterdam]> + dag = DAG("my_tz_dag", default_args=default_args) + op = DummyOperator(task_id="dummy", dag=dag) + print(dag.timezone) # <Timezone [Europe/Amsterdam]> Please note that while it is possible to set a ``start_date`` and ``end_date`` for Tasks always the DAG timezone or global timezone (in that order) will be used to calculate the next execution date. Upon first encounter diff --git a/docs/apache-airflow/tutorial_taskflow_api.rst b/docs/apache-airflow/tutorial_taskflow_api.rst index 39820b2..03da9e0 100644 --- a/docs/apache-airflow/tutorial_taskflow_api.rst +++ b/docs/apache-airflow/tutorial_taskflow_api.rst @@ -214,11 +214,11 @@ Building this dependency is shown in the code below: A simple Extract task to get data ready for the rest of the data pipeline, by reading the data from a file into a pandas dataframe """ - order_data_file = '/tmp/order_data.csv' + order_data_file = "/tmp/order_data.csv" order_data_df = pd.read_csv(order_data_file) - file_task = FileSensor(task_id='check_file', filepath='/tmp/order_data.csv') + file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv") order_data = extract_from_file() file_task >> order_data diff --git a/docs/apache-airflow/ui.rst b/docs/apache-airflow/ui.rst index 728c2ae..26a0893 100644 --- a/docs/apache-airflow/ui.rst +++ b/docs/apache-airflow/ui.rst @@ -36,7 +36,7 @@ For example: .. code-block:: python - dag = DAG('dag', tags=['team1', 'sql']) + dag = DAG("dag", tags=["team1", "sql"]) ------------ diff --git a/docs/apache-airflow/upgrading-to-2.rst b/docs/apache-airflow/upgrading-to-2.rst index 96293ae..1473e4f 100644 --- a/docs/apache-airflow/upgrading-to-2.rst +++ b/docs/apache-airflow/upgrading-to-2.rst @@ -146,14 +146,14 @@ The behavior can be reverted when instantiating a DAG. import jinja2 - dag = DAG('simple_dag', template_undefined=jinja2.Undefined) + dag = DAG("simple_dag", template_undefined=jinja2.Undefined) Alternatively, it is also possible to override each Jinja Template variable on an individual basis by using the ``| default`` Jinja filter as shown below. .. code-block:: python - {{ a | default(1) }} + {{a | default(1)}} @@ -174,23 +174,18 @@ Whereas previously a user would import each individual class to build the pod as from airflow.kubernetes.volume_mount import VolumeMount - volume_config = { - 'persistentVolumeClaim': { - 'claimName': 'test-volume' - } - } - volume = Volume(name='test-volume', configs=volume_config) - volume_mount = VolumeMount('test-volume', - mount_path='/root/mount_file', - sub_path=None, - read_only=True) + volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}} + volume = Volume(name="test-volume", configs=volume_config) + volume_mount = VolumeMount( + "test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True + ) - port = Port('http', 80) - secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') - secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') + port = Port("http", 80) + secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn") + secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn") k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo", "10"], @@ -219,23 +214,25 @@ Now the user can use the ``kubernetes.client.models`` class as a single point of from airflow.kubernetes.secret import Secret - configmaps = ['test-configmap-1', 'test-configmap-2'] + configmaps = ["test-configmap-1", "test-configmap-2"] volume = k8s.V1Volume( - name='test-volume', - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), + name="test-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name="test-volume" + ), ) - port = k8s.V1ContainerPort(name='http', container_port=80) - secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn') - secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn') - secret_all_keys = Secret('env', None, 'airflow-secrets-2') + port = k8s.V1ContainerPort(name="http", container_port=80) + secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn") + secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn") + secret_all_keys = Secret("env", None, "airflow-secrets-2") volume_mount = k8s.V1VolumeMount( - name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True + name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True ) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo", "10"], @@ -247,7 +244,8 @@ Now the user can use the ``kubernetes.client.models`` class as a single point of name="airflow-test-pod", task_id="task", is_delete_operator_pod=True, - hostnetwork=False) + hostnetwork=False, + ) We decided to keep the Secret class as users seem to really like that simplifies the complexity of mounting @@ -445,9 +443,9 @@ While in the deprecated version a user would mount a volume using the following "mountPath": "/foo/", "name": "example-kubernetes-test-volume", }, - ] + ], } - } + }, ) In the new model a user can accomplish the same thing using the following code under the ``pod_override`` key: @@ -459,30 +457,29 @@ In the new model a user can accomplish the same thing using the following code u second_task = PythonOperator( task_id="four_task", python_callable=test_volume_mount, - executor_config={"pod_override": k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[ - k8s.V1VolumeMount( - mount_path="/foo/", - name="example-kubernetes-test-volume" - ) - ] - ) - ], - volumes=[ - k8s.V1Volume( - name="example-kubernetes-test-volume", - host_path=k8s.V1HostPathVolumeSource( - path="/tmp/" + executor_config={ + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + volume_mounts=[ + k8s.V1VolumeMount( + mount_path="/foo/", + name="example-kubernetes-test-volume", + ) + ], + ) + ], + volumes=[ + k8s.V1Volume( + name="example-kubernetes-test-volume", + host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), ) - ) - ] + ], + ) ) - ) - } + }, ) For Airflow 2.0, the traditional ``executor_config`` will continue operation with a deprecation warning, @@ -551,9 +548,10 @@ Before: .. code-block:: python from airflow.kubernetes.pod import Port - port = Port('http', 80) + + port = Port("http", 80) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -566,9 +564,10 @@ After: .. code-block:: python from kubernetes.client import models as k8s - port = k8s.V1ContainerPort(name='http', container_port=80) + + port = k8s.V1ContainerPort(name="http", container_port=80) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -583,12 +582,12 @@ Before: .. code-block:: python from airflow.kubernetes.volume_mount import VolumeMount - volume_mount = VolumeMount('test-volume', - mount_path='/root/mount_file', - sub_path=None, - read_only=True) + + volume_mount = VolumeMount( + "test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True + ) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -601,11 +600,12 @@ After: .. code-block:: python from kubernetes.client import models as k8s + volume_mount = k8s.V1VolumeMount( - name='test-volume', mount_path='/root/mount_file', sub_path=None, read_only=True + name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True ) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -622,14 +622,10 @@ Before: from airflow.kubernetes.volume import Volume - volume_config = { - 'persistentVolumeClaim': { - 'claimName': 'test-volume' - } - } - volume = Volume(name='test-volume', configs=volume_config) + volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}} + volume = Volume(name="test-volume", configs=volume_config) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -642,12 +638,15 @@ After: .. code-block:: python from kubernetes.client import models as k8s + volume = k8s.V1Volume( - name='test-volume', - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name='test-volume'), + name="test-volume", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name="test-volume" + ), ) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -662,7 +661,7 @@ Before: .. code-block:: python k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -677,17 +676,12 @@ After: from kubernetes.client import models as k8s env_vars = [ - k8s.V1EnvVar( - name="ENV1", - value="val1" - ), - k8s.V1EnvVar( - name="ENV2", - value="val2" - )] + k8s.V1EnvVar(name="ENV1", value="val1"), + k8s.V1EnvVar(name="ENV2", value="val2"), + ] k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -707,7 +701,7 @@ Before: from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -725,15 +719,13 @@ After: k8s.V1EnvVar( name="ENV3", value_from=k8s.V1EnvVarSource( - field_ref=k8s.V1ObjectFieldSelector( - field_path="status.podIP" - ) - ) + field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP") + ), ) ] k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -750,12 +742,12 @@ Before: .. code-block:: python k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], - configmaps=['test-configmap'], - task_id="task" + configmaps=["test-configmap"], + task_id="task", ) @@ -765,20 +757,18 @@ After: from kubernetes.client import models as k8s - configmap ="test-configmap" - env_from = [k8s.V1EnvFromSource( - config_map_ref=k8s.V1ConfigMapEnvSource( - name=configmap - ) - )] + configmap = "test-configmap" + env_from = [ + k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap)) + ] k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], env_from=env_from, - task_id="task" + task_id="task", ) @@ -789,15 +779,15 @@ Before: .. code-block:: python resources = { - 'limit_cpu': 0.25, - 'limit_memory': '64Mi', - 'limit_ephemeral_storage': '2Gi', - 'request_cpu': '250m', - 'request_memory': '64Mi', - 'request_ephemeral_storage': '1Gi', + "limit_cpu": 0.25, + "limit_memory": "64Mi", + "limit_ephemeral_storage": "2Gi", + "request_cpu": "250m", + "request_memory": "64Mi", + "request_ephemeral_storage": "1Gi", } k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -815,21 +805,17 @@ After: from kubernetes.client import models as k8s - resources=k8s.V1ResourceRequirements( - requests={ - 'memory': '64Mi', - 'cpu': '250m', - 'ephemeral-storage': '1Gi' - }, + resources = k8s.V1ResourceRequirements( + requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"}, limits={ - 'memory': '64Mi', - 'cpu': 0.25, - 'nvidia.com/gpu': None, - 'ephemeral-storage': '2Gi' - } + "memory": "64Mi", + "cpu": 0.25, + "nvidia.com/gpu": None, + "ephemeral-storage": "2Gi", + }, ) k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], @@ -849,14 +835,14 @@ Before: .. code-block:: python k = KubernetesPodOperator( - namespace='default', + namespace="default", image="ubuntu:16.04", cmds=["bash", "-cx"], arguments=["echo 10"], name="test", task_id="task", image_pull_secrets="fake-secret", - cluster_context='default' + cluster_context="default", ) After: @@ -864,9 +850,9 @@ After: .. code-block:: python quay_k8s = KubernetesPodOperator( - namespace='default', - image='quay.io/apache/bash', - image_pull_secrets=[k8s.V1LocalObjectReference('testquay')], + namespace="default", + image="quay.io/apache/bash", + image_pull_secrets=[k8s.V1LocalObjectReference("testquay")], cmds=["bash", "-cx"], name="airflow-private-image-pod", task_id="task-two", @@ -1136,19 +1122,19 @@ non-RBAC UI (``flask-admin`` based UI), update it to use ``flask_appbuilder_view class TestView(BaseView): - @expose('/') + @expose("/") def test(self): # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html return self.render("test_plugin/test.html", content="Hello galaxy!") + v = TestView(category="Test Plugin", name="Test View") ml = MenuLink( - category='Test Plugin', - name='Test Menu Link', - url='https://airflow.apache.org/' + category="Test Plugin", name="Test Menu Link", url="https://airflow.apache.org/" ) + class AirflowTestPlugin(AirflowPlugin): admin_views = [v] menu_links = [ml] @@ -1161,6 +1147,7 @@ non-RBAC UI (``flask-admin`` based UI), update it to use ``flask_appbuilder_view from airflow.plugins_manager import AirflowPlugin from flask_appbuilder import expose, BaseView as AppBuilderBaseView + class TestAppBuilderBaseView(AppBuilderBaseView): default_view = "test" @@ -1168,16 +1155,21 @@ non-RBAC UI (``flask-admin`` based UI), update it to use ``flask_appbuilder_view def test(self): return self.render_template("test_plugin/test.html", content="Hello galaxy!") + v_appbuilder_view = TestAppBuilderBaseView() - v_appbuilder_package = {"name": "Test View", - "category": "Test Plugin", - "view": v_appbuilder_view} + v_appbuilder_package = { + "name": "Test View", + "category": "Test Plugin", + "view": v_appbuilder_view, + } # Creating a flask appbuilder Menu Item - appbuilder_mitem = {"name": "Google", - "category": "Search", - "category_icon": "fa-th", - "href": "https://www.google.com"} + appbuilder_mitem = { + "name": "Google", + "category": "Search", + "category_icon": "fa-th", + "href": "https://www.google.com", + } # Defining the plugin class diff --git a/tests/dags_corrupted/README.md b/tests/dags_corrupted/README.md index 2cf2aaa..5c3a027 100644 --- a/tests/dags_corrupted/README.md +++ b/tests/dags_corrupted/README.md @@ -26,7 +26,8 @@ To access a DAG in this folder, use the following code inside a unit test. ```python TEST_DAG_FOLDER = os.path.join( - os.path.dirname(os.path.realpath(__file__)), 'dags_corrupted') + os.path.dirname(os.path.realpath(__file__)), "dags_corrupted" +) dagbag = DagBag(dag_folder=TEST_DAG_FOLDER) dag = dagbag.get_dag(dag_id) diff --git a/tests/test_utils/perf/perf_kit/__init__.py b/tests/test_utils/perf/perf_kit/__init__.py index 25583fc..f9b0fe2 100644 --- a/tests/test_utils/perf/perf_kit/__init__.py +++ b/tests/test_utils/perf/perf_kit/__init__.py @@ -66,16 +66,18 @@ Suppose we have the following fragment of the file with tests. .. code-block:: python - prev = dag.previous_schedule(_next) - prev_local = local_tz.convert(prev) + prev = dag.previous_schedule(_next) + prev_local = local_tz.convert(prev) + + assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00" + assert prev.isoformat() == "2018-03-24T02:00:00+00:00" - assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00" - assert prev.isoformat() == "2018-03-24T02:00:00+00:00" def test_bulk_write_to_db(self): clear_db_dags() dags = [ - DAG(f'dag-bulk-sync-{i}', start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4) + DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) + for i in range(0, 4) ] with assert_queries_count(3): @@ -87,19 +89,21 @@ queries in it. .. code-block:: python :emphasize-lines: 6-8 - prev = dag.previous_schedule(_next) - prev_local = local_tz.convert(prev) + prev = dag.previous_schedule(_next) + prev_local = local_tz.convert(prev) - assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00" - assert prev.isoformat() == "2018-03-24T02:00:00+00:00" + assert prev_local.isoformat() == "2018-03-24T03:00:00+01:00" + assert prev.isoformat() == "2018-03-24T02:00:00+00:00" from tests.utils.perf.perf_kit.sqlalchemy import trace_queries + @trace_queries def test_bulk_write_to_db(self): clear_db_dags() dags = [ - DAG(f'dag-bulk-sync-{i}', start_date=DEFAULT_DATE, tags=["test-dag"]) for i in range(0, 4) + DAG(f"dag-bulk-sync-{i}", start_date=DEFAULT_DATE, tags=["test-dag"]) + for i in range(0, 4) ] with assert_queries_count(3):