This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e35561d5e15 Fix generate_if for BQ if the run_type=scheduled (#58035)
e35561d5e15 is described below

commit e35561d5e15ab45f52d32078a383edca618b29eb
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Nov 17 11:48:44 2025 +0100

    Fix generate_if for BQ if the run_type=scheduled (#58035)
---
 .../providers/google/cloud/hooks/bigquery.py       | 21 +++++-------
 .../tests/unit/google/cloud/hooks/test_bigquery.py | 40 +++++++++++++++++++---
 2 files changed, 45 insertions(+), 16 deletions(-)

diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index b8d2451bc8f..1a79d6f11b7 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -81,6 +81,7 @@ from airflow.providers.google.version_compat import 
AIRFLOW_V_3_0_PLUS
 from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.helpers import convert_camel_to_snake
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.types import DagRunType
 
 if TYPE_CHECKING:
     import pandas as pd
@@ -1312,7 +1313,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         task_id: str,
         logical_date: datetime | None,
         configuration: dict,
-        run_after: pendulum.DateTime | None = None,
+        run_after: pendulum.DateTime | datetime | None = None,
         force_rerun: bool = False,
     ) -> str:
         if force_rerun:
@@ -1341,18 +1342,14 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
         job_id = 
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
         return re.sub(r"[:\-+.]", "_", job_id)
 
-    def get_run_after_or_logical_date(self, context: Context) -> 
pendulum.DateTime:
+    def get_run_after_or_logical_date(self, context: Context) -> 
pendulum.DateTime | datetime | None:
+        dag_run = context.get("dag_run")
+        if not dag_run:
+            return pendulum.now("UTC")
+
         if AIRFLOW_V_3_0_PLUS:
-            if dag_run := context.get("dag_run"):
-                run_after = pendulum.instance(dag_run.run_after)
-            else:
-                run_after = pendulum.now("UTC")
-        else:
-            if logical_date := context.get("logical_date"):
-                run_after = logical_date
-            else:
-                run_after = pendulum.now("UTC")
-        return run_after
+            return dag_run.start_date
+        return dag_run.start_date if dag_run.run_type == DagRunType.SCHEDULED 
else context.get("logical_date")
 
     def split_tablename(
         self, table_input: str, default_project_id: str, var_name: str | None 
= None
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py 
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 2ad9c025044..e5d899ce96f 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -40,6 +40,7 @@ from google.cloud.bigquery.table import _EmptyRowIterator
 from google.cloud.exceptions import NotFound
 
 from airflow.exceptions import AirflowException
+from airflow.models import DagRun
 from airflow.providers.common.compat.assets import Asset
 from airflow.providers.common.compat.sdk import Context
 from airflow.providers.google.cloud.hooks.bigquery import (
@@ -52,6 +53,7 @@ from airflow.providers.google.cloud.hooks.bigquery import (
     _validate_src_fmt_configs,
     _validate_value,
 )
+from airflow.utils.types import DagRunType
 
 from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
 
@@ -684,14 +686,44 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
         assert job_id == expected_job_id
 
     def test_get_run_after_or_logical_date(self):
+        """Test get_run_after_or_logical_date for both Airflow 3.x and pre-3.0 
behavior."""
         if AIRFLOW_V_3_0_PLUS:
-            from airflow.models import DagRun
+            ctx = Context(
+                dag_run=DagRun(
+                    run_type=DagRunType.MANUAL,
+                    start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+                ),
+                logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+            )
+            assert self.hook.get_run_after_or_logical_date(ctx) == 
pendulum.datetime(2025, 2, 2, tz="UTC")
+
+            ctx = Context(
+                dag_run=DagRun(
+                    run_type=DagRunType.SCHEDULED,
+                    start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+                ),
+                logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+            )
+            assert self.hook.get_run_after_or_logical_date(ctx) == 
pendulum.datetime(2025, 2, 2, tz="UTC")
 
-            ctx = Context(dag_run=DagRun(run_after=pendulum.datetime(2025, 1, 
1)))
         else:
-            ctx = Context(logical_date=pendulum.datetime(2025, 1, 1))
+            ctx = Context(
+                dag_run=DagRun(
+                    run_type=DagRunType.MANUAL,
+                    start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+                ),
+                logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+            )
+            assert self.hook.get_run_after_or_logical_date(ctx) == 
pendulum.datetime(2025, 1, 1, tz="UTC")
 
-        assert self.hook.get_run_after_or_logical_date(ctx) == 
pendulum.datetime(2025, 1, 1)
+            ctx = Context(
+                dag_run=DagRun(
+                    run_type=DagRunType.SCHEDULED,
+                    start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+                ),
+                logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+            )
+            assert self.hook.get_run_after_or_logical_date(ctx) == 
pendulum.datetime(2025, 2, 2, tz="UTC")
 
     @mock.patch(
         "airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_job",

Reply via email to