This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ef80f6b028 Move away from deprecated DAG.following_schedule() method 
(#41773)
ef80f6b028 is described below

commit ef80f6b028f215574b08b124c44d8c28d719635b
Author: Jens Scheffler <[email protected]>
AuthorDate: Wed Aug 28 13:28:59 2024 +0200

    Move away from deprecated DAG.following_schedule() method (#41773)
    
    * Move away from deprecated DAG.following_schedule() method
    
    * Update mock to changed logic and remove one layer of Pendulum
---
 airflow/providers/google/cloud/operators/gcs.py    |  8 ++++----
 tests/providers/google/cloud/operators/test_gcs.py | 22 ++++++++++++++++++++--
 2 files changed, 24 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/gcs.py 
b/airflow/providers/google/cloud/operators/gcs.py
index c396e173ea..561de4c46e 100644
--- a/airflow/providers/google/cloud/operators/gcs.py
+++ b/airflow/providers/google/cloud/operators/gcs.py
@@ -795,11 +795,11 @@ class 
GCSTimeSpanFileTransformOperator(GoogleCloudBaseOperator):
             orig_end = context["data_interval_end"]
         except KeyError:
             orig_start = pendulum.instance(context["execution_date"])
-            following_execution_date = 
context["dag"].following_schedule(context["execution_date"])
-            if following_execution_date is None:
-                orig_end = None
+            next_dagrun = 
context["dag"].next_dagrun_info(last_automated_dagrun=None, restricted=False)
+            if next_dagrun and next_dagrun.data_interval and 
next_dagrun.data_interval.end:
+                orig_end = next_dagrun.data_interval.end
             else:
-                orig_end = pendulum.instance(following_execution_date)
+                orig_end = None
 
         timespan_start = orig_start
         if orig_end is None:  # Only possible in Airflow before 2.2.
diff --git a/tests/providers/google/cloud/operators/test_gcs.py 
b/tests/providers/google/cloud/operators/test_gcs.py
index 1a5acd0bf6..fcb5fcb925 100644
--- a/tests/providers/google/cloud/operators/test_gcs.py
+++ b/tests/providers/google/cloud/operators/test_gcs.py
@@ -21,6 +21,7 @@ from datetime import datetime, timedelta, timezone
 from pathlib import Path
 from unittest import mock
 
+import pendulum
 import pytest
 
 from airflow.providers.common.compat.openlineage.facet import (
@@ -40,6 +41,7 @@ from airflow.providers.google.cloud.operators.gcs import (
     GCSSynchronizeBucketsOperator,
     GCSTimeSpanFileTransformOperator,
 )
+from airflow.timetables.base import DagRunInfo, DataInterval
 
 TASK_ID = "test-gcs-operator"
 TEST_BUCKET = "test-bucket"
@@ -395,7 +397,15 @@ class TestGCSTimeSpanFileTransformOperator:
         timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, 
tzinfo=timezone.utc)
         timespan_end = timespan_start + timedelta(hours=1)
         mock_dag = mock.Mock()
-        mock_dag.following_schedule = lambda x: x + timedelta(hours=1)
+        mock_dag.next_dagrun_info.side_effect = [
+            DagRunInfo(
+                run_after=pendulum.instance(timespan_start),
+                data_interval=DataInterval(
+                    start=pendulum.instance(timespan_start),
+                    end=pendulum.instance(timespan_end),
+                ),
+            ),
+        ]
         mock_ti = mock.Mock()
         context = dict(
             execution_date=timespan_start,
@@ -575,7 +585,15 @@ class TestGCSTimeSpanFileTransformOperator:
 
         timespan_start = datetime(2015, 2, 1, 15, 16, 17, 345, 
tzinfo=timezone.utc)
         mock_dag = mock.Mock()
-        mock_dag.following_schedule = lambda x: x + timedelta(hours=1)
+        mock_dag.next_dagrun_info.side_effect = [
+            DagRunInfo(
+                run_after=pendulum.instance(timespan_start),
+                data_interval=DataInterval(
+                    start=pendulum.instance(timespan_start),
+                    end=None,
+                ),
+            ),
+        ]
         context = dict(
             execution_date=timespan_start,
             dag=mock_dag,

Reply via email to