This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 969a275df02a81d1f3176ca010e565fb950e6d35 Author: Jarek Potiuk <[email protected]> AuthorDate: Fri Feb 18 01:08:42 2022 +0100 Clarify pendulum use in timezone cases (#21646) It is important to use Pendulum in case timezone is used - because there are a number of limitations coming from using stdlib timezone implementation. However our documentation was not very clear about it, especially some examples shown using standard datetime in DAGs which could mislead our users to continue using datetime if they use timezone. This PR clarifies and stresses the use of pendulum is necessary when timezone is used. Also it points to the documentation in case serialization throws error about not using Pendulum so that the users can learn about the reasoning. This is the first part of the change - the follow up will be changing all provider examples to also use timezone and pendulum explicitly. See also #20070 (cherry picked from commit f011da235f705411239d992bc3c92f1c072f89a9) --- CONTRIBUTING.rst | 2 +- UPDATING.md | 6 ++--- airflow/example_dags/example_bash_operator.py | 8 ++++--- .../example_branch_datetime_operator.py | 14 ++++++------ .../example_branch_day_of_week_operator.py | 4 ++-- airflow/example_dags/example_branch_labels.py | 7 ++++-- airflow/example_dags/example_branch_operator.py | 5 +++-- .../example_branch_python_dop_operator_3.py | 4 ++-- airflow/example_dags/example_complex.py | 4 ++-- airflow/example_dags/example_dag_decorator.py | 9 ++++++-- .../example_external_task_marker_dag.py | 4 ++-- .../example_dags/example_kubernetes_executor.py | 5 +++-- .../example_latest_only_with_trigger.py | 8 ++++--- airflow/example_dags/example_nested_branch_dag.py | 4 ++-- .../example_passing_params_via_test_command.py | 8 ++++--- airflow/example_dags/example_python_operator.py | 5 +++-- .../example_dags/example_short_circuit_operator.py | 4 ++-- airflow/example_dags/example_skip_dag.py | 9 ++++++-- airflow/example_dags/example_sla_dag.py | 8 ++++--- airflow/example_dags/example_task_group.py | 7 ++++-- .../example_dags/example_task_group_decorator.py | 7 ++++-- .../example_time_delta_sensor_async.py | 8 ++++--- .../example_dags/example_trigger_controller_dag.py | 4 ++-- airflow/example_dags/example_trigger_target_dag.py | 4 ++-- airflow/example_dags/example_xcom.py | 4 ++-- airflow/example_dags/example_xcomargs.py | 7 +++--- airflow/example_dags/subdags/subdag.py | 4 ++-- airflow/example_dags/tutorial_etl_dag.py | 5 +++-- airflow/example_dags/tutorial_taskflow_api_etl.py | 10 +++++++-- airflow/models/dag.py | 3 +++ airflow/serialization/serialized_objects.py | 6 ++++- docs/apache-airflow/best-practices.rst | 14 +++++++----- docs/apache-airflow/concepts/dags.rst | 26 +++++++++++++++------- docs/apache-airflow/concepts/operators.rst | 2 +- docs/apache-airflow/dag-run.rst | 11 ++++----- docs/apache-airflow/executor/kubernetes.rst | 5 +++-- docs/apache-airflow/faq.rst | 24 +++++++++++++++----- docs/apache-airflow/howto/timetable.rst | 8 +++---- docs/apache-airflow/lineage.rst | 7 +++--- .../logging-monitoring/callbacks.rst | 8 ++++--- docs/apache-airflow/timezone.rst | 19 +++++++++++----- docs/apache-airflow/tutorial.rst | 15 ++++++++++--- .../extending/embedding-dags/test_dag.py | 10 +++++---- 43 files changed, 216 insertions(+), 120 deletions(-) diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index d598742..838f658 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -1376,7 +1376,7 @@ We are using certain prefixes for email subjects for different purposes. Start y Voting is governed by the rules described in `Voting <https://www.apache.org/foundation/voting.html>`_ We are all devoting our time for community as individuals who except for being active in Apache Airflow have -families, daily jobs, right for vacation. Sometimes we are in different time zones or simply are +families, daily jobs, right for vacation. Sometimes we are in different timezones or simply are busy with day-to-day duties that our response time might be delayed. For us it's crucial to remember to respect each other in the project with no formal structure. There are no managers, departments, most of us is autonomous in our opinions, decisions. diff --git a/UPDATING.md b/UPDATING.md index 2ed4aac..0273d5a 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -184,7 +184,7 @@ Similarly, `DAG.concurrency` has been renamed to `DAG.max_active_tasks`. ```python dag = DAG( dag_id="example_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, concurrency=3, ) @@ -195,7 +195,7 @@ dag = DAG( ```python dag = DAG( dag_id="example_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, max_active_tasks=3, ) @@ -3216,7 +3216,7 @@ Type "help", "copyright", "credits" or "license" for more information. >>> from airflow.models.dag import DAG >>> from airflow.operators.dummy import DummyOperator >>> ->>> dag = DAG('simple_dag', start_date=datetime(2017, 9, 1)) +>>> dag = DAG('simple_dag', start_date=pendulum.datetime(2017, 9, 1, tz="UTC")) >>> >>> task = DummyOperator(task_id='task_1', dag=dag) >>> diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index f679f8d..8204592 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -18,7 +18,9 @@ """Example DAG demonstrating the usage of the BashOperator.""" -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow import DAG from airflow.operators.bash import BashOperator @@ -27,9 +29,9 @@ from airflow.operators.dummy import DummyOperator with DAG( dag_id='example_bash_operator', schedule_interval='0 0 * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), tags=['example', 'example2'], params={"example_key": "example_value"}, ) as dag: diff --git a/airflow/example_dags/example_branch_datetime_operator.py b/airflow/example_dags/example_branch_datetime_operator.py index bdc50ca..76b109f 100644 --- a/airflow/example_dags/example_branch_datetime_operator.py +++ b/airflow/example_dags/example_branch_datetime_operator.py @@ -20,7 +20,7 @@ Example DAG demonstrating the usage of DateTimeBranchOperator with datetime as well as time objects as targets. """ -import datetime +import pendulum from airflow import DAG from airflow.operators.datetime import BranchDateTimeOperator @@ -28,7 +28,7 @@ from airflow.operators.dummy import DummyOperator dag = DAG( dag_id="example_branch_datetime_operator", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", @@ -42,8 +42,8 @@ cond1 = BranchDateTimeOperator( task_id='datetime_branch', follow_task_ids_if_true=['date_in_range'], follow_task_ids_if_false=['date_outside_range'], - target_upper=datetime.datetime(2020, 10, 10, 15, 0, 0), - target_lower=datetime.datetime(2020, 10, 10, 14, 0, 0), + target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0), + target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0), dag=dag, ) @@ -54,7 +54,7 @@ cond1 >> [dummy_task_1, dummy_task_2] dag = DAG( dag_id="example_branch_datetime_operator_2", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", @@ -67,8 +67,8 @@ cond2 = BranchDateTimeOperator( task_id='datetime_branch', follow_task_ids_if_true=['date_in_range'], follow_task_ids_if_false=['date_outside_range'], - target_upper=datetime.time(0, 0, 0), - target_lower=datetime.time(15, 0, 0), + target_upper=pendulum.time(0, 0, 0), + target_lower=pendulum.time(15, 0, 0), dag=dag, ) diff --git a/airflow/example_dags/example_branch_day_of_week_operator.py b/airflow/example_dags/example_branch_day_of_week_operator.py index 6d1a331..dae303a 100644 --- a/airflow/example_dags/example_branch_day_of_week_operator.py +++ b/airflow/example_dags/example_branch_day_of_week_operator.py @@ -19,7 +19,7 @@ """ Example DAG demonstrating the usage of BranchDayOfWeekOperator. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -27,7 +27,7 @@ from airflow.operators.weekday import BranchDayOfWeekOperator with DAG( dag_id="example_weekday_branch_operator", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], schedule_interval="@daily", diff --git a/airflow/example_dags/example_branch_labels.py b/airflow/example_dags/example_branch_labels.py index bd6ce09..2215bcf 100644 --- a/airflow/example_dags/example_branch_labels.py +++ b/airflow/example_dags/example_branch_labels.py @@ -19,14 +19,17 @@ """ Example DAG demonstrating the usage of labels with different branches. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.utils.edgemodifier import Label with DAG( - "example_branch_labels", schedule_interval="@daily", start_date=datetime(2021, 1, 1), catchup=False + "example_branch_labels", + schedule_interval="@daily", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, ) as dag: ingest = DummyOperator(task_id="ingest") analyse = DummyOperator(task_id="analyze") diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index 69f939e..eaa1532 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -19,7 +19,8 @@ """Example DAG demonstrating the usage of the BranchPythonOperator.""" import random -from datetime import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -29,7 +30,7 @@ from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id='example_branch_operator', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", tags=['example', 'example2'], diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 09d96be..d85eda1 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -20,7 +20,7 @@ Example DAG demonstrating the usage of BranchPythonOperator with depends_on_past=True, where tasks may be run or skipped on alternating runs. """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -49,7 +49,7 @@ def should_run(**kwargs): with DAG( dag_id='example_branch_dop_operator_v3', schedule_interval='*/1 * * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args={'depends_on_past': True}, tags=['example'], diff --git a/airflow/example_dags/example_complex.py b/airflow/example_dags/example_complex.py index a141236..22e1906 100644 --- a/airflow/example_dags/example_complex.py +++ b/airflow/example_dags/example_complex.py @@ -19,7 +19,7 @@ """ Example Airflow DAG that shows the complex DAG structure. """ -from datetime import datetime +import pendulum from airflow import models from airflow.models.baseoperator import chain @@ -28,7 +28,7 @@ from airflow.operators.bash import BashOperator with models.DAG( dag_id="example_complex", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example', 'example2', 'example3'], ) as dag: diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index 66b0fa4..af1438c 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -15,10 +15,10 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -from datetime import datetime from typing import Any, Dict import httpx +import pendulum from airflow.decorators import dag, task from airflow.models.baseoperator import BaseOperator @@ -37,7 +37,12 @@ class GetRequestOperator(BaseOperator): # [START dag_decorator_usage] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) +@dag( + schedule_interval=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) def example_dag_decorator(email: str = '[email protected]'): """ DAG to send server IP to email. diff --git a/airflow/example_dags/example_external_task_marker_dag.py b/airflow/example_dags/example_external_task_marker_dag.py index 851a7ad..eed2f72 100644 --- a/airflow/example_dags/example_external_task_marker_dag.py +++ b/airflow/example_dags/example_external_task_marker_dag.py @@ -37,13 +37,13 @@ interval till one of the following will happen: exception """ -import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor -start_date = datetime.datetime(2015, 1, 1) +start_date = pendulum.datetime(2021, 1, 1, tz="UTC") with DAG( dag_id="example_external_task_marker_parent", diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index f984909..6318d51 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -20,7 +20,8 @@ This is an example dag for using a Kubernetes Executor Configuration. """ import logging import os -from datetime import datetime + +import pendulum from airflow import DAG from airflow.configuration import conf @@ -45,7 +46,7 @@ if k8s: with DAG( dag_id='example_kubernetes_executor', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example3'], ) as dag: diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index 76b5f63..67f004a 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -20,7 +20,9 @@ Example LatestOnlyOperator and TriggerRule interactions """ # [START example] -import datetime as dt +import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -29,8 +31,8 @@ from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id='latest_only_with_trigger', - schedule_interval=dt.timedelta(hours=4), - start_date=dt.datetime(2021, 1, 1), + schedule_interval=datetime.timedelta(hours=4), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example3'], ) as dag: diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py index add81a9..27e7105 100644 --- a/airflow/example_dags/example_nested_branch_dag.py +++ b/airflow/example_dags/example_nested_branch_dag.py @@ -21,7 +21,7 @@ Example DAG demonstrating a workflow with nested branching. The join tasks are c ``none_failed_min_one_success`` trigger rule such that they are skipped whenever their corresponding ``BranchPythonOperator`` are skipped. """ -from datetime import datetime +import pendulum from airflow.models import DAG from airflow.operators.dummy import DummyOperator @@ -30,7 +30,7 @@ from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id="example_nested_branch_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", tags=["example"], diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index d4781af..f97f941 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -18,10 +18,12 @@ """Example DAG demonstrating the usage of the params arguments in templated arguments.""" +import datetime import os -from datetime import datetime, timedelta from textwrap import dedent +import pendulum + from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator @@ -61,9 +63,9 @@ def print_env_vars(test_mode=None): with DAG( "example_passing_params_via_test_command", schedule_interval='*/1 * * * *', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=4), + dagrun_timeout=datetime.timedelta(minutes=4), tags=['example'], ) as dag: run_this = my_py_command(params={"miff": "agg"}) diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index d533d84..0f9a7fc 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -23,9 +23,10 @@ virtual environment. import logging import shutil import time -from datetime import datetime from pprint import pprint +import pendulum + from airflow import DAG from airflow.decorators import task @@ -34,7 +35,7 @@ log = logging.getLogger(__name__) with DAG( dag_id='example_python_operator', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index d349685..4c1187a 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of the ShortCircuitOperator.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.models.baseoperator import chain @@ -26,7 +26,7 @@ from airflow.operators.python import ShortCircuitOperator with DAG( dag_id='example_short_circuit_operator', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index cb664e7..0e67ed1 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -18,7 +18,7 @@ """Example DAG demonstrating the DummyOperator and a custom DummySkipOperator which skips by default.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.exceptions import AirflowSkipException @@ -54,6 +54,11 @@ def create_test_pipeline(suffix, trigger_rule): join >> final -with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag: +with DAG( + dag_id='example_skip_dag', + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) as dag: create_test_pipeline('1', TriggerRule.ALL_SUCCESS) create_test_pipeline('2', TriggerRule.ONE_SUCCESS) diff --git a/airflow/example_dags/example_sla_dag.py b/airflow/example_dags/example_sla_dag.py index 7a46bc4..0db6bc1 100644 --- a/airflow/example_dags/example_sla_dag.py +++ b/airflow/example_dags/example_sla_dag.py @@ -15,8 +15,10 @@ # specific language governing permissions and limitations # under the License. +import datetime import time -from datetime import datetime, timedelta + +import pendulum from airflow.decorators import dag, task @@ -39,13 +41,13 @@ def sla_callback(dag, task_list, blocking_task_list, slas, blocking_tis): @dag( schedule_interval="*/2 * * * *", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, sla_miss_callback=sla_callback, default_args={'email': "[email protected]"}, ) def example_sla_dag(): - @task(sla=timedelta(seconds=10)) + @task(sla=datetime.timedelta(seconds=10)) def sleep_20(): """Sleep for 20 seconds""" time.sleep(20) diff --git a/airflow/example_dags/example_task_group.py b/airflow/example_dags/example_task_group.py index d81bf00..46f709e 100644 --- a/airflow/example_dags/example_task_group.py +++ b/airflow/example_dags/example_task_group.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of the TaskGroup.""" -from datetime import datetime +import pendulum from airflow.models.dag import DAG from airflow.operators.bash import BashOperator @@ -26,7 +26,10 @@ from airflow.utils.task_group import TaskGroup # [START howto_task_group] with DAG( - dag_id="example_task_group", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"] + dag_id="example_task_group", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], ) as dag: start = DummyOperator(task_id="start") diff --git a/airflow/example_dags/example_task_group_decorator.py b/airflow/example_dags/example_task_group_decorator.py index 0e53a98..30f9d6f 100644 --- a/airflow/example_dags/example_task_group_decorator.py +++ b/airflow/example_dags/example_task_group_decorator.py @@ -18,7 +18,7 @@ """Example DAG demonstrating the usage of the @taskgroup decorator.""" -from datetime import datetime +import pendulum from airflow.decorators import task, task_group from airflow.models.dag import DAG @@ -65,7 +65,10 @@ def task_group_function(value): # Executing Tasks and TaskGroups with DAG( - dag_id="example_task_group_decorator", start_date=datetime(2021, 1, 1), catchup=False, tags=["example"] + dag_id="example_task_group_decorator", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=["example"], ) as dag: start_task = task_start() end_task = task_end() diff --git a/airflow/example_dags/example_time_delta_sensor_async.py b/airflow/example_dags/example_time_delta_sensor_async.py index ce8cab0..1a7126a 100644 --- a/airflow/example_dags/example_time_delta_sensor_async.py +++ b/airflow/example_dags/example_time_delta_sensor_async.py @@ -21,7 +21,9 @@ Example DAG demonstrating ``TimeDeltaSensorAsync``, a drop in replacement for `` defers and doesn't occupy a worker slot while it waits """ -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -30,10 +32,10 @@ from airflow.sensors.time_delta import TimeDeltaSensorAsync with DAG( dag_id="example_time_delta_sensor_async", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: - wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=10)) + wait = TimeDeltaSensorAsync(task_id="wait", delta=datetime.timedelta(seconds=10)) finish = DummyOperator(task_id="finish") wait >> finish diff --git a/airflow/example_dags/example_trigger_controller_dag.py b/airflow/example_dags/example_trigger_controller_dag.py index 27df3d2..a017c9a 100644 --- a/airflow/example_dags/example_trigger_controller_dag.py +++ b/airflow/example_dags/example_trigger_controller_dag.py @@ -21,14 +21,14 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs: 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.trigger_dagrun import TriggerDagRunOperator with DAG( dag_id="example_trigger_controller_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@once", tags=['example'], diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 41aecf1..64ccb59 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -21,7 +21,7 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs: 1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG 2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG """ -from datetime import datetime +import pendulum from airflow import DAG from airflow.decorators import task @@ -41,7 +41,7 @@ def run_this_func(dag_run=None): with DAG( dag_id="example_trigger_target_dag", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 405d5c5..b55d4e5d 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -17,7 +17,7 @@ # under the License. """Example DAG demonstrating the usage of XComs.""" -from datetime import datetime +import pendulum from airflow import DAG from airflow.decorators import task @@ -64,7 +64,7 @@ def pull_value_from_bash_push(ti=None): with DAG( 'example_xcom', schedule_interval="@once", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/example_xcomargs.py b/airflow/example_dags/example_xcomargs.py index 7e0cdd9..00af472 100644 --- a/airflow/example_dags/example_xcomargs.py +++ b/airflow/example_dags/example_xcomargs.py @@ -18,7 +18,8 @@ """Example DAG demonstrating the usage of the XComArgs.""" import logging -from datetime import datetime + +import pendulum from airflow import DAG from airflow.decorators import task @@ -41,7 +42,7 @@ def print_value(value, ts=None): with DAG( dag_id='example_xcom_args', - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], @@ -50,7 +51,7 @@ with DAG( with DAG( "example_xcom_args_with_operators", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval=None, tags=['example'], diff --git a/airflow/example_dags/subdags/subdag.py b/airflow/example_dags/subdags/subdag.py index d337a03..7c91309 100644 --- a/airflow/example_dags/subdags/subdag.py +++ b/airflow/example_dags/subdags/subdag.py @@ -19,7 +19,7 @@ """Helper function to generate a DAG and operators given some arguments.""" # [START subdag] -from datetime import datetime +import pendulum from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -38,7 +38,7 @@ def subdag(parent_dag_name, child_dag_name, args): dag_subdag = DAG( dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, schedule_interval="@daily", ) diff --git a/airflow/example_dags/tutorial_etl_dag.py b/airflow/example_dags/tutorial_etl_dag.py index dd18449..d039a73 100644 --- a/airflow/example_dags/tutorial_etl_dag.py +++ b/airflow/example_dags/tutorial_etl_dag.py @@ -24,9 +24,10 @@ This ETL DAG is demonstrating an Extract -> Transform -> Load pipeline # [START tutorial] # [START import_module] import json -from datetime import datetime from textwrap import dedent +import pendulum + # The DAG object; we'll need this to instantiate a DAG from airflow import DAG @@ -45,7 +46,7 @@ with DAG( # [END default_args] description='ETL DAG tutorial', schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=['example'], ) as dag: diff --git a/airflow/example_dags/tutorial_taskflow_api_etl.py b/airflow/example_dags/tutorial_taskflow_api_etl.py index 3b0ba51..f6af78f 100644 --- a/airflow/example_dags/tutorial_taskflow_api_etl.py +++ b/airflow/example_dags/tutorial_taskflow_api_etl.py @@ -20,7 +20,8 @@ # [START tutorial] # [START import_module] import json -from datetime import datetime + +import pendulum from airflow.decorators import dag, task @@ -28,7 +29,12 @@ from airflow.decorators import dag, task # [START instantiate_dag] -@dag(schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) +@dag( + schedule_interval=None, + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + catchup=False, + tags=['example'], +) def tutorial_taskflow_api_etl(): """ ### TaskFlow API Tutorial Documentation diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 477e597..150220c 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -187,6 +187,9 @@ class DAG(LoggingMixin): DAGs essentially act as namespaces for tasks. A task_id can only be added once to a DAG. + Note that if you plan to use time zones all the dates provided should be pendulum + dates. See :ref:`timezone_aware_dags`. + :param dag_id: The id of the DAG; must consist exclusively of alphanumeric characters, dashes, dots and underscores (all ASCII) :type dag_id: str diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index bc8361d..ebe20b0 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -42,6 +42,7 @@ from airflow.serialization.json_schema import Validator, load_dag_schema from airflow.settings import json from airflow.timetables.base import Timetable from airflow.utils.code_utils import get_python_source +from airflow.utils.docs import get_docs_url from airflow.utils.module_loading import as_importable_string, import_string from airflow.utils.task_group import TaskGroup @@ -113,7 +114,10 @@ def encode_timezone(var: Timezone) -> Union[str, int]: return var.offset if isinstance(var, Timezone): return var.name - raise ValueError(f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}") + raise ValueError( + f"DAG timezone should be a pendulum.tz.Timezone, not {var!r}. " + f"See {get_docs_url('timezone.html#time-zone-aware-dags')}" + ) def decode_timezone(var: Union[str, int]) -> Timezone: diff --git a/docs/apache-airflow/best-practices.rst b/docs/apache-airflow/best-practices.rst index 951e6b4..3b01b3e 100644 --- a/docs/apache-airflow/best-practices.rst +++ b/docs/apache-airflow/best-practices.rst @@ -121,7 +121,7 @@ Bad example: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.python import PythonOperator @@ -131,7 +131,7 @@ Bad example: with DAG( dag_id="example_python_operator", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: @@ -151,7 +151,7 @@ Good example: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.python import PythonOperator @@ -159,7 +159,7 @@ Good example: with DAG( dag_id="example_python_operator", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: @@ -237,12 +237,13 @@ Then you can import and use the ``ALL_TASKS`` constant in all your DAGs like tha .. code-block:: python + import pendulum from my_company_utils.common import ALL_TASKS with DAG( dag_id="my_dag", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) as dag: for task in ALL_TASKS: @@ -486,13 +487,14 @@ This is an example test want to verify the structure of a code-generated DAG aga .. code-block:: python import datetime + import pendulum import pytest from airflow.utils.state import DagRunState from airflow.utils.types import DagRunType - DATA_INTERVAL_START = datetime.datetime(2021, 9, 13) + DATA_INTERVAL_START = pendulum.datetime(2021, 9, 13, tz="UTC") DATA_INTERVAL_END = DATA_INTERVAL_START + datetime.timedelta(days=1) TEST_DAG_ID = "my_custom_operator_dag" diff --git a/docs/apache-airflow/concepts/dags.rst b/docs/apache-airflow/concepts/dags.rst index e339abe..32d21ce 100644 --- a/docs/apache-airflow/concepts/dags.rst +++ b/docs/apache-airflow/concepts/dags.rst @@ -38,19 +38,22 @@ There are three ways to declare a DAG - either you can use a context manager, which will add the DAG to anything inside it implicitly:: with DAG( - "my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False + "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False ) as dag: op = DummyOperator(task_id="task") Or, you can use a standard constructor, passing the dag into any operators you use:: - my_dag = DAG("my_dag_name", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) + my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False) op = DummyOperator(task_id="task", dag=my_dag) Or, you can use the ``@dag`` decorator to :ref:`turn a function into a DAG generator <concepts:dag-decorator>`:: - @dag(start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False) + @dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", catchup=False) def generate_dag(): op = DummyOperator(task_id="task") @@ -214,10 +217,11 @@ Default Arguments Often, many Operators inside a DAG need the same set of default arguments (such as their ``retries``). Rather than having to specify this individually for every Operator, you can instead pass ``default_args`` to the DAG when you create it, and it will auto-apply them to any operator tied to it:: + import pendulum with DAG( dag_id='my_dag', - start_date=datetime(2016, 1, 1), + start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval='@daily', catchup=False, default_args={'retries': 2}, @@ -390,7 +394,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality .. code-block:: python # dags/branch_without_trigger.py - import datetime as dt + import pendulum from airflow.models import DAG from airflow.operators.dummy import DummyOperator @@ -399,7 +403,7 @@ You can also combine this with the :ref:`concepts:depends-on-past` functionality dag = DAG( dag_id="branch_without_trigger", schedule_interval="@once", - start_date=dt.datetime(2019, 2, 28), + start_date=pendulum.datetime(2019, 2, 28, tz="UTC"), ) run_this_first = DummyOperator(task_id="run_this_first", dag=dag) @@ -483,9 +487,11 @@ Dependency relationships can be applied across all tasks in a TaskGroup with the TaskGroup also supports ``default_args`` like DAG, it will overwrite the ``default_args`` in DAG level:: + import pendulum + with DAG( dag_id='dag1', - start_date=datetime(2016, 1, 1), + start_date=pendulum.datetime(2016, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False, default_args={'retries': 1}, @@ -563,9 +569,13 @@ This is especially useful if your tasks are built dynamically from configuration """ ### My great DAG """ + import pendulum dag = DAG( - "my_dag", start_date=datetime(2021, 1, 1), schedule_interval="@daily", catchup=False + "my_dag", + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + schedule_interval="@daily", + catchup=False, ) dag.doc_md = __doc__ diff --git a/docs/apache-airflow/concepts/operators.rst b/docs/apache-airflow/concepts/operators.rst index 13020f1..30ec5ce 100644 --- a/docs/apache-airflow/concepts/operators.rst +++ b/docs/apache-airflow/concepts/operators.rst @@ -175,7 +175,7 @@ you can pass ``render_template_as_native_obj=True`` to the DAG as follows: dag = DAG( dag_id="example_template_as_python_object", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, render_template_as_native_obj=True, ) diff --git a/docs/apache-airflow/dag-run.rst b/docs/apache-airflow/dag-run.rst index 62555b1..1a2bbe3 100644 --- a/docs/apache-airflow/dag-run.rst +++ b/docs/apache-airflow/dag-run.rst @@ -113,17 +113,18 @@ in the configuration file. When turned off, the scheduler creates a DAG run only """ from airflow.models.dag import DAG from airflow.operators.bash import BashOperator - from datetime import datetime, timedelta + import datetime + import pendulum dag = DAG( "tutorial", default_args={ "depends_on_past": True, "retries": 1, - "retry_delay": timedelta(minutes=3), + "retry_delay": datetime.timedelta(minutes=3), }, - start_date=datetime(2015, 12, 1), + start_date=pendulum.datetime(2015, 12, 1, tz="UTC"), description="A simple tutorial DAG", schedule_interval="@daily", catchup=False, @@ -225,7 +226,7 @@ Example of a parameterized DAG: .. code-block:: python - from datetime import datetime + import pendulum from airflow import DAG from airflow.operators.bash import BashOperator @@ -233,7 +234,7 @@ Example of a parameterized DAG: dag = DAG( "example_parameterized_dag", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) diff --git a/docs/apache-airflow/executor/kubernetes.rst b/docs/apache-airflow/executor/kubernetes.rst index 341f9fe..496e3e7 100644 --- a/docs/apache-airflow/executor/kubernetes.rst +++ b/docs/apache-airflow/executor/kubernetes.rst @@ -154,7 +154,8 @@ Here is an example of a task with both features: .. code-block:: python import os - from datetime import datetime + + import pendulum from airflow import DAG from airflow.decorators import task @@ -166,7 +167,7 @@ Here is an example of a task with both features: with DAG( dag_id="example_pod_template_file", schedule_interval=None, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example3"], ) as dag: diff --git a/docs/apache-airflow/faq.rst b/docs/apache-airflow/faq.rst index 7f72a0d..6f2e778 100644 --- a/docs/apache-airflow/faq.rst +++ b/docs/apache-airflow/faq.rst @@ -149,7 +149,8 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo from airflow import DAG from airflow.operators.python_operator import PythonOperator - from datetime import datetime + + import pendulum def create_dag(dag_id, schedule, dag_number, default_args): @@ -157,7 +158,12 @@ until ``min_file_process_interval`` is reached since DAG Parser will look for mo print("Hello World") print("This is DAG: {}".format(str(dag_number))) - dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args) + dag = DAG( + dag_id, + schedule_interval=schedule, + default_args=default_args, + pendulum.datetime(2021, 9, 13, tz="UTC"), + ) with dag: t1 = PythonOperator(task_id="hello_world", python_callable=hello_world_py) @@ -213,6 +219,14 @@ backfill CLI command, gets overridden by the backfill's ``start_date`` commands. This allows for a backfill on tasks that have ``depends_on_past=True`` to actually start. If this were not the case, the backfill just would not start. +Using time zones +---------------- + +Creating a time zone aware datetime (e.g. DAG's ``start_date``) is quite simple. Just make sure to supply +a time zone aware dates using ``pendulum``. Don't try to use standard library +`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to +have limitations and we deliberately disallow using them in DAGs. + .. _faq:what-does-execution-date-mean: @@ -360,12 +374,12 @@ upstream task. .. code-block:: python + import pendulum + from airflow.decorators import dag, task from airflow.exceptions import AirflowException from airflow.utils.trigger_rule import TriggerRule - from datetime import datetime - @task def a_func(): @@ -379,7 +393,7 @@ upstream task. pass - @dag(schedule_interval="@once", start_date=datetime(2021, 1, 1)) + @dag(schedule_interval="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC")) def my_dag(): a = a_func() b = b_func() diff --git a/docs/apache-airflow/howto/timetable.rst b/docs/apache-airflow/howto/timetable.rst index 1fd7102..ed902fc 100644 --- a/docs/apache-airflow/howto/timetable.rst +++ b/docs/apache-airflow/howto/timetable.rst @@ -69,7 +69,7 @@ file: .. code-block:: python - import datetime + import pendulum from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable @@ -77,7 +77,7 @@ file: with DAG( dag_id="example_after_workday_timetable_dag", - start_date=datetime.datetime(2021, 3, 10), + start_date=pendulum.datetime(2021, 3, 10, tz="UTC"), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: @@ -190,7 +190,7 @@ For reference, here's our plugin and DAG files in their entirety: .. code-block:: python - import datetime + import pendulum from airflow import DAG from airflow.example_dags.plugins.workday import AfterWorkdayTimetable @@ -199,7 +199,7 @@ For reference, here's our plugin and DAG files in their entirety: with DAG( dag_id="example_workday_timetable", - start_date=datetime.datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), timetable=AfterWorkdayTimetable(), tags=["example", "timetable"], ) as dag: diff --git a/docs/apache-airflow/lineage.rst b/docs/apache-airflow/lineage.rst index 9b8bb71..3a4db94 100644 --- a/docs/apache-airflow/lineage.rst +++ b/docs/apache-airflow/lineage.rst @@ -30,7 +30,8 @@ works. .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum from airflow.lineage import AUTO from airflow.lineage.entities import File @@ -42,10 +43,10 @@ works. dag = DAG( dag_id="example_lineage", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="0 0 * * *", catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) f_final = File(url="/tmp/final") diff --git a/docs/apache-airflow/logging-monitoring/callbacks.rst b/docs/apache-airflow/logging-monitoring/callbacks.rst index 77ac594..15bbacb 100644 --- a/docs/apache-airflow/logging-monitoring/callbacks.rst +++ b/docs/apache-airflow/logging-monitoring/callbacks.rst @@ -51,7 +51,9 @@ In the following example, failures in any task call the ``task_failure_alert`` f .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum + from airflow import DAG from airflow.operators.dummy import DummyOperator @@ -67,8 +69,8 @@ In the following example, failures in any task call the ``task_failure_alert`` f with DAG( dag_id="example_callback", schedule_interval=None, - start_date=datetime(2021, 1, 1), - dagrun_timeout=timedelta(minutes=60), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), + dagrun_timeout=datetime.timedelta(minutes=60), catchup=False, on_success_callback=None, on_failure_callback=task_failure_alert, diff --git a/docs/apache-airflow/timezone.rst b/docs/apache-airflow/timezone.rst index 32e5223..bcbbd11 100644 --- a/docs/apache-airflow/timezone.rst +++ b/docs/apache-airflow/timezone.rst @@ -40,6 +40,7 @@ The time zone is set in ``airflow.cfg``. By default it is set to utc, but you ch an arbitrary IANA time zone, e.g. ``Europe/Amsterdam``. It is dependent on ``pendulum``, which is more accurate than ``pytz``. Pendulum is installed when you install Airflow. + Web UI ------ @@ -90,7 +91,11 @@ words if you have a default time zone setting of ``Europe/Amsterdam`` and create .. code-block:: python - dag = DAG("my_dag", start_date=datetime(2017, 1, 1), default_args={"retries": 3}) + dag = DAG( + "my_dag", + start_date=pendulum.datetime(2017, 1, 1, tz="UTC"), + default_args={"retries": 3}, + ) op = BashOperator(task_id="dummy", bash_command="Hello World!", dag=dag) print(op.retries) # 3 @@ -120,19 +125,21 @@ it is therefore important to make sure this setting is equal on all Airflow node .. note:: For more information on setting the configuration, see :doc:`howto/set-config` +.. _timezone_aware_dags: + Time zone aware DAGs -------------------- Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware ``start_date`` -using ``pendulum``. +using ``pendulum``. Don't try to use standard library +`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to +have limitations and we deliberately disallow using them in DAGs. .. code-block:: python import pendulum - local_tz = pendulum.timezone("Europe/Amsterdam") - - dag = DAG("my_tz_dag", start_date=datetime(2016, 1, 1, tzinfo=local_tz)) + dag = DAG("my_tz_dag", start_date=pendulum.datetime(2016, 1, 1, tz="Europe/Amsterdam")) op = DummyOperator(task_id="dummy", dag=dag) print(dag.timezone) # <Timezone [Europe/Amsterdam]> @@ -170,6 +177,6 @@ Time deltas Time zone aware DAGs that use ``timedelta`` or ``relativedelta`` schedules respect daylight savings time for the start date but do not adjust for daylight savings time when scheduling subsequent runs. For example, a -DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="US/Eastern")`` +DAG with a start date of ``pendulum.datetime(2020, 1, 1, tz="UTC")`` and a schedule interval of ``timedelta(days=1)`` will run daily at 05:00 UTC regardless of daylight savings time. diff --git a/docs/apache-airflow/tutorial.rst b/docs/apache-airflow/tutorial.rst index a8f76bc..085be42 100644 --- a/docs/apache-airflow/tutorial.rst +++ b/docs/apache-airflow/tutorial.rst @@ -230,6 +230,14 @@ Note that when executing your script, Airflow will raise exceptions when it finds cycles in your DAG or when a dependency is referenced more than once. +Using time zones +---------------- + +Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware dates +using ``pendulum``. Don't try to use standard library +`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to +have limitations and we deliberately disallow using them in DAGs. + Recap ----- Alright, so we have a pretty basic DAG. At this point your code should look @@ -474,7 +482,8 @@ Lets look at our DAG: .. code-block:: python - from datetime import datetime, timedelta + import datetime + import pendulum import requests from airflow.decorators import dag, task @@ -483,9 +492,9 @@ Lets look at our DAG: @dag( schedule_interval="0 0 * * *", - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, - dagrun_timeout=timedelta(minutes=60), + dagrun_timeout=datetime.timedelta(minutes=60), ) def Etl(): @task diff --git a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py index 25ceeba..a12f2f6 100644 --- a/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py +++ b/docs/docker-stack/docker-examples/extending/embedding-dags/test_dag.py @@ -17,13 +17,15 @@ # under the License. # [START dag] """This dag only runs some simple tasks to test Airflow's task execution.""" -from datetime import datetime, timedelta +import datetime + +import pendulum from airflow.models.dag import DAG from airflow.operators.dummy import DummyOperator -now = datetime.now() -now_to_the_hour = (now - timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) +now = pendulum.now(tz="UTC") +now_to_the_hour = (now - datetime.timedelta(0, 0, 0, 0, 0, 3)).replace(minute=0, second=0, microsecond=0) START_DATE = now_to_the_hour DAG_NAME = 'test_dag_v1' @@ -31,7 +33,7 @@ dag = DAG( DAG_NAME, schedule_interval='*/10 * * * *', default_args={'depends_on_past': True}, - start_date=datetime(2021, 1, 1), + start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, )
