This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ef80f6b028 Move away from deprecated DAG.following_schedule() method
(#41773)
ef80f6b028 is described below
commit ef80f6b028f215574b08b124c44d8c28d719635b
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Aug 28 13:28:59 2024 +0200
Move away from deprecated DAG.following_schedule() method (#41773)
* Move away from deprecated DAG.following_schedule() method
* Update mock to changed logic and remove one layer of Pendulum
---
airflow/providers/google/cloud/operators/gcs.py | 8 ++++----
tests/providers/google/cloud/operators/test_gcs.py | 22 ++++++++++++++++++++--
2 files changed, 24 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/gcs.py
b/airflow/providers/google/cloud/operators/gcs.py
index c396e173ea..561de4c46e 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -795,11 +795,11 @@ class
GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator):
orig_end = context["data_interval_end"]
except KeyError:
orig_start = pendulum.instance(context["execution_date"])
- following_execution_date =
context["dag"].following_schedule(context["execution_date"])
- if following_execution_date is None:
- orig_end = None
+ next_dagrun =
context["dag"].next_dagrun_info(last_automated_dagrun=None, restricted=False)
+ if next_dagrun and next_dagrun.data_interval and
next_dagrun.data_interval.end:
+ orig_end = next_dagrun.data_interval.end
else:
- orig_end = pendulum.instance(following_execution_date)
+ orig_end = None
timespan_start = orig_start
if orig_end is None: # Only possible in Airflow before 2.2.
diff --git a/tests/providers/google/cloud/operators/test_gcs.py
b/tests/providers/google/cloud/operators/test_gcs.py
index 1a5acd0bf6..fcb5fcb925 100644
--- a/tests/providers/google/cloud/operators/test_gcs.py
+++ b/tests/providers/google/cloud/operators/test_gcs.py
@@ -21,6 +21,7 @@ from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest import mock
+import pendulum
import pytest
from airflow.providers.common.compat.openlineage.facet import (
@@ -40,6 +41,7 @@ from airflow.providers.google.cloud.operators.gcs import (
GCSSynchronizeBucketsOperator,
GCSTimeSpanFileTransformOperator,
)
+from airflow.timetables.base import DagRunInfo, DataInterval
TASK_ID = "test-gcs-operator"
TEST_BUCKET = "test-bucket"
@@ -395,7 +397,15 @@ class TestGCSTimeSpanFileTransformOperator:
timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345,
tzinfo=timezone.utc)
timespan_end = timespan_start + timedelta(hours=1)
mock_dag = mock.Mock()
- mock_dag.following_schedule = lambda x: x + timedelta(hours=1)
+ mock_dag.next_dagrun_info.side_effect = [
+ DagRunInfo(
+ run_after=pendulum.instance(timespan_start),
+ data_interval=DataInterval(
+ start=pendulum.instance(timespan_start),
+ end=pendulum.instance(timespan_end),
+ ),
+ ),
+ ]
mock_ti = mock.Mock()
context = dict(
execution_date=timespan_start,
@@ -575,7 +585,15 @@ class TestGCSTimeSpanFileTransformOperator:
timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345,
tzinfo=timezone.utc)
mock_dag = mock.Mock()
- mock_dag.following_schedule = lambda x: x + timedelta(hours=1)
+ mock_dag.next_dagrun_info.side_effect = [
+ DagRunInfo(
+ run_after=pendulum.instance(timespan_start),
+ data_interval=DataInterval(
+ start=pendulum.instance(timespan_start),
+ end=None,
+ ),
+ ),
+ ]
context = dict(
execution_date=timespan_start,
dag=mock_dag,