[AIRFLOW-863] Example DAGs should have recent start dates Avoid unnecessary backfills by having start dates of just a few days ago. Adds a utility function airflow.utils.dates.days_ago().
Closes #2068 from jlowin/example-start-date (cherry picked from commit bbfd43df4663547abda4ac6fdc3a6ed730a75b57) Signed-off-by: Bolke de Bruin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3658bf31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3658bf31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3658bf31 Branch: refs/heads/v1-8-stable Commit: 3658bf310811cd22651b6c20c5d50bfbd3153025 Parents: 563cc9a Author: Jeremiah Lowin <[email protected]> Authored: Sun Feb 12 15:37:56 2017 -0500 Committer: Bolke de Bruin <[email protected]> Committed: Sat Feb 18 15:26:34 2017 +0100 ---------------------------------------------------------------------- .../example_emr_job_flow_automatic_steps.py | 6 +-- .../example_emr_job_flow_manual_steps.py | 5 ++- .../example_dags/example_qubole_operator.py | 6 +-- .../contrib/example_dags/example_twitter_dag.py | 5 ++- airflow/example_dags/example_bash_operator.py | 9 ++-- airflow/example_dags/example_branch_operator.py | 8 ++-- .../example_branch_python_dop_operator_3.py | 5 +-- airflow/example_dags/example_http_operator.py | 7 ++- airflow/example_dags/example_latest_only.py | 4 +- .../example_latest_only_with_trigger.py | 4 +- .../example_passing_params_via_test_command.py | 6 +-- airflow/example_dags/example_python_operator.py | 7 +-- .../example_short_circuit_operator.py | 7 ++- airflow/example_dags/example_skip_dag.py | 9 ++-- airflow/example_dags/example_subdag_operator.py | 4 +- airflow/example_dags/example_xcom.py | 9 ++-- airflow/example_dags/test_utils.py | 3 +- airflow/example_dags/tutorial.py | 7 ++- airflow/utils/dates.py | 13 ++++++ dags/test_dag.py | 2 +- scripts/perf/dags/perf_dag_1.py | 7 ++- scripts/perf/dags/perf_dag_2.py | 8 ++-- tests/utils/__init__.py | 16 +++++++ tests/utils/dates.py | 45 ++++++++++++++++++++ 24 files changed, 132 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py index 18399c7..7f57ad1 100644 --- a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py +++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import timedelta, datetime - +from datetime import timedelta +import airflow from airflow import DAG from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor @@ -21,7 +21,7 @@ from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor DEFAULT_ARGS = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': datetime(2016, 3, 13), + 'start_date': airflow.utils.dates.days_ago(2), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py index b498d50..caa6943 100644 --- a/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py +++ b/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import timedelta, datetime +from datetime import timedelta +import airflow from airflow import DAG from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator @@ -23,7 +24,7 @@ from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTermina DEFAULT_ARGS = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': datetime(2016, 3, 13), + 'start_date': airflow.utils.dates.days_ago(2), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py index b482cf4..fce0175 100644 --- a/airflow/contrib/example_dags/example_qubole_operator.py +++ b/airflow/contrib/example_dags/example_qubole_operator.py @@ -16,17 +16,15 @@ from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator, BranchPythonOperator from airflow.contrib.operators.qubole_operator import QuboleOperator -from datetime import datetime, timedelta import filecmp import random -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) + default_args = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2) 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/contrib/example_dags/example_twitter_dag.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_twitter_dag.py b/airflow/contrib/example_dags/example_twitter_dag.py index d63b4e8..a25c8d0 100644 --- a/airflow/contrib/example_dags/example_twitter_dag.py +++ b/airflow/contrib/example_dags/example_twitter_dag.py @@ -22,11 +22,12 @@ # Load The Dependencies # -------------------------------------------------------------------------------- +import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.hive_operator import HiveOperator -from datetime import datetime, date, timedelta +from datetime import date, timedelta # -------------------------------------------------------------------------------- # Create a few placeholder scripts. In practice these would be different python @@ -57,7 +58,7 @@ def transfertodb(): default_args = { 'owner': 'Ekhtiar', 'depends_on_past': False, - 'start_date': datetime(2016, 3, 13), + 'start_date': airflow.utils.dates.days_ago(5), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_bash_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_bash_operator.py b/airflow/example_dags/example_bash_operator.py index 0d18bcf..6887fa9 100644 --- a/airflow/example_dags/example_bash_operator.py +++ b/airflow/example_dags/example_bash_operator.py @@ -11,17 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import airflow from builtins import range from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG -from datetime import datetime, timedelta +from datetime import timedelta + -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) args = { 'owner': 'airflow', - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_branch_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index cc559d0..2b11d91 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -11,17 +11,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import airflow from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG -from datetime import datetime, timedelta import random -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) + args = { 'owner': 'airflow', - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_branch_python_dop_operator_3.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_branch_python_dop_operator_3.py b/airflow/example_dags/example_branch_python_dop_operator_3.py index 1dd190e..6da7b68 100644 --- a/airflow/example_dags/example_branch_python_dop_operator_3.py +++ b/airflow/example_dags/example_branch_python_dop_operator_3.py @@ -13,16 +13,15 @@ # limitations under the License. # +import airflow from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG from datetime import datetime, timedelta -two_days_ago = datetime.combine(datetime.today() - timedelta(2), - datetime.min.time()) args = { 'owner': 'airflow', - 'start_date': two_days_ago, + 'start_date': airflow.utils.dates.days_ago(2), 'depends_on_past': True, } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_http_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_http_operator.py b/airflow/example_dags/example_http_operator.py index 18a67f5..0cc23b9 100644 --- a/airflow/example_dags/example_http_operator.py +++ b/airflow/example_dags/example_http_operator.py @@ -14,19 +14,18 @@ """ ### Example HTTP operator and sensor """ +import airflow from airflow import DAG from airflow.operators.http_operator import SimpleHttpOperator from airflow.operators.sensors import HttpSensor -from datetime import datetime, timedelta +from datetime import timedelta import json -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) default_args = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_latest_only.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_latest_only.py b/airflow/example_dags/example_latest_only.py index 9ce03b9..38ee900 100644 --- a/airflow/example_dags/example_latest_only.py +++ b/airflow/example_dags/example_latest_only.py @@ -16,16 +16,16 @@ Example of the LatestOnlyOperator """ import datetime as dt +import airflow from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule - dag = DAG( dag_id='latest_only', schedule_interval=dt.timedelta(hours=4), - start_date=dt.datetime(2016, 9, 20), + start_date=airflow.utils.dates.days_ago(2), ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_latest_only_with_trigger.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_latest_only_with_trigger.py b/airflow/example_dags/example_latest_only_with_trigger.py index e3a88b7..f2afdcf 100644 --- a/airflow/example_dags/example_latest_only_with_trigger.py +++ b/airflow/example_dags/example_latest_only_with_trigger.py @@ -16,16 +16,16 @@ Example LatestOnlyOperator and TriggerRule interactions """ import datetime as dt +import airflow from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.latest_only_operator import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule - dag = DAG( dag_id='latest_only_with_trigger', schedule_interval=dt.timedelta(hours=4), - start_date=dt.datetime(2016, 9, 20), + start_date=airflow.utils.dates.days_ago(2), ) latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_passing_params_via_test_command.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index e337f3b..448effb 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -13,15 +13,15 @@ # limitations under the License. # -from datetime import datetime, timedelta - +from datetime import timedelta +import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator dag = DAG("example_passing_params_via_test_command", default_args={"owner": "airflow", - "start_date":datetime.now()}, + "start_date": airflow.utils.dates.days_ago(1)}, schedule_interval='*/1 * * * *', dagrun_timeout=timedelta(minutes=4) ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_python_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index c5d7193..8108e1e 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -13,19 +13,16 @@ # limitations under the License. from __future__ import print_function from builtins import range +import airflow from airflow.operators.python_operator import PythonOperator from airflow.models import DAG -from datetime import datetime, timedelta import time from pprint import pprint -seven_days_ago = datetime.combine( - datetime.today() - timedelta(7), datetime.min.time()) - args = { 'owner': 'airflow', - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_short_circuit_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_short_circuit_operator.py b/airflow/example_dags/example_short_circuit_operator.py index 92efe99..c9812ac 100644 --- a/airflow/example_dags/example_short_circuit_operator.py +++ b/airflow/example_dags/example_short_circuit_operator.py @@ -11,17 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import airflow from airflow.operators.python_operator import ShortCircuitOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG import airflow.utils.helpers -from datetime import datetime, timedelta -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) + args = { 'owner': 'airflow', - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG(dag_id='example_short_circuit_operator', default_args=args) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_skip_dag.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index a38b126..b936020 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -12,16 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import airflow from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG -from datetime import datetime, timedelta from airflow.exceptions import AirflowSkipException -seven_days_ago = datetime.combine(datetime.today() - timedelta(1), - datetime.min.time()) + args = { 'owner': 'airflow', - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2) } @@ -53,5 +52,3 @@ def create_test_pipeline(suffix, trigger_rule, dag): create_test_pipeline('1', 'all_success', dag) create_test_pipeline('2', 'one_success', dag) - - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_subdag_operator.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_subdag_operator.py b/airflow/example_dags/example_subdag_operator.py index b872f43..0c11787 100644 --- a/airflow/example_dags/example_subdag_operator.py +++ b/airflow/example_dags/example_subdag_operator.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from datetime import datetime +import airflow from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator @@ -24,7 +24,7 @@ DAG_NAME = 'example_subdag_operator' args = { 'owner': 'airflow', - 'start_date': datetime(2016, 1, 1), + 'start_date': airflow.utils.dates.days_ago(2), } dag = DAG( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/example_xcom.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index 50728c3..b41421b 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -12,22 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function +import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator -from datetime import datetime, timedelta -seven_days_ago = datetime.combine( - datetime.today() - timedelta(7), - datetime.min.time()) args = { 'owner': 'airflow', - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2), 'provide_context': True } dag = DAG( 'example_xcom', - start_date=datetime(2015, 1, 1), schedule_interval="@once", default_args=args) @@ -60,6 +56,7 @@ def puller(**kwargs): v1, v2 = ti.xcom_pull(key=None, task_ids=['push', 'push_by_returning']) assert (v1, v2) == (value_1, value_2) + push1 = PythonOperator( task_id='push', dag=dag, python_callable=push) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/test_utils.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/test_utils.py b/airflow/example_dags/test_utils.py index 70391c3..0ed9bdb 100644 --- a/airflow/example_dags/test_utils.py +++ b/airflow/example_dags/test_utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """Used for unit tests""" +import airflow from airflow.operators.bash_operator import BashOperator from airflow.models import DAG from datetime import datetime @@ -25,5 +26,5 @@ task = BashOperator( task_id='sleeps_forever', dag=dag, bash_command="sleep 10000000000", - start_date=datetime(2016, 1, 1), + start_date=airflow.utils.dates.days_ago(2), owner='airflow') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/example_dags/tutorial.py ---------------------------------------------------------------------- diff --git a/airflow/example_dags/tutorial.py b/airflow/example_dags/tutorial.py index c7b2e0f..6ede09a 100644 --- a/airflow/example_dags/tutorial.py +++ b/airflow/example_dags/tutorial.py @@ -17,19 +17,18 @@ Documentation that goes along with the Airflow tutorial located [here](http://pythonhosted.org/airflow/tutorial.html) """ +import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator -from datetime import datetime, timedelta +from datetime import timedelta -seven_days_ago = datetime.combine(datetime.today() - timedelta(7), - datetime.min.time()) # these args will get passed on to each operator # you can override them on a per-task basis during operator initialization default_args = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': seven_days_ago, + 'start_date': airflow.utils.dates.days_ago(2), 'email': ['[email protected]'], 'email_on_failure': False, 'email_on_retry': False, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/airflow/utils/dates.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dates.py b/airflow/utils/dates.py index 84fd791..f89b20c 100644 --- a/airflow/utils/dates.py +++ b/airflow/utils/dates.py @@ -212,3 +212,16 @@ def scale_time_units(time_seconds_arr, unit): elif unit == 'days': return list(map(lambda x: x*1.0/(24*60*60), time_seconds_arr)) return time_seconds_arr + + +def days_ago(n, hour=0, minute=0, second=0, microsecond=0): + """ + Get a datetime object representing `n` days ago. By default the time is + set to midnight. + """ + today = datetime.today().replace( + hour=hour, + minute=minute, + second=second, + microsecond=microsecond) + return today - timedelta(days=n) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/dags/test_dag.py ---------------------------------------------------------------------- diff --git a/dags/test_dag.py b/dags/test_dag.py index a1cbb74..db0b648 100644 --- a/dags/test_dag.py +++ b/dags/test_dag.py @@ -24,7 +24,7 @@ DAG_NAME = 'test_dag_v1' default_args = { 'owner': 'airflow', 'depends_on_past': True, - 'start_date': START_DATE, + 'start_date': airflow.utils.dates.days_ago(2) } dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/scripts/perf/dags/perf_dag_1.py ---------------------------------------------------------------------- diff --git a/scripts/perf/dags/perf_dag_1.py b/scripts/perf/dags/perf_dag_1.py index d97c830..fe71303 100644 --- a/scripts/perf/dags/perf_dag_1.py +++ b/scripts/perf/dags/perf_dag_1.py @@ -11,15 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import airflow from airflow.operators.bash_operator import BashOperator from airflow.models import DAG -from datetime import datetime, timedelta +from datetime import timedelta -five_days_ago = datetime.combine(datetime.today() - timedelta(5), - datetime.min.time()) args = { 'owner': 'airflow', - 'start_date': five_days_ago, + 'start_date': airflow.utils.dates.days_ago(3), } dag = DAG( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/scripts/perf/dags/perf_dag_2.py ---------------------------------------------------------------------- diff --git a/scripts/perf/dags/perf_dag_2.py b/scripts/perf/dags/perf_dag_2.py index cccd547..16948d4 100644 --- a/scripts/perf/dags/perf_dag_2.py +++ b/scripts/perf/dags/perf_dag_2.py @@ -11,15 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import airflow from airflow.operators.bash_operator import BashOperator from airflow.models import DAG -from datetime import datetime, timedelta +from datetime import timedelta -five_days_ago = datetime.combine(datetime.today() - timedelta(5), - datetime.min.time()) args = { 'owner': 'airflow', - 'start_date': five_days_ago, + 'start_date': airflow.utils.dates.days_ago(3), } dag = DAG( http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/tests/utils/__init__.py ---------------------------------------------------------------------- diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 0000000..6b15998 --- /dev/null +++ b/tests/utils/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .compression import * +from .dates import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3658bf31/tests/utils/dates.py ---------------------------------------------------------------------- diff --git a/tests/utils/dates.py b/tests/utils/dates.py new file mode 100644 index 0000000..dc0c87e --- /dev/null +++ b/tests/utils/dates.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta +import unittest + +from airflow.utils import dates + +class Dates(unittest.TestCase): + + def test_days_ago(self): + today = datetime.today() + today_midnight = datetime.fromordinal(today.date().toordinal()) + + self.assertTrue(dates.days_ago(0) == today_midnight) + + self.assertTrue( + dates.days_ago(100) == today_midnight + timedelta(days=-100)) + + self.assertTrue( + dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3)) + self.assertTrue( + dates.days_ago(0, minute=3) + == today_midnight + timedelta(minutes=3)) + self.assertTrue( + dates.days_ago(0, second=3) + == today_midnight + timedelta(seconds=3)) + self.assertTrue( + dates.days_ago(0, microsecond=3) + == today_midnight + timedelta(microseconds=3)) + + +if __name__ == '__main__': + unittest.main()
