This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e35561d5e15 Fix generate_if for BQ if the run_type=scheduled (#58035)
e35561d5e15 is described below
commit e35561d5e15ab45f52d32078a383edca618b29eb
Author: VladaZakharova <[email protected]>
AuthorDate: Mon Nov 17 11:48:44 2025 +0100
Fix generate_if for BQ if the run_type=scheduled (#58035)
---
.../providers/google/cloud/hooks/bigquery.py | 21 +++++-------
.../tests/unit/google/cloud/hooks/test_bigquery.py | 40 +++++++++++++++++++---
2 files changed, 45 insertions(+), 16 deletions(-)
diff --git
a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
index b8d2451bc8f..1a79d6f11b7 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py
@@ -81,6 +81,7 @@ from airflow.providers.google.version_compat import
AIRFLOW_V_3_0_PLUS
from airflow.utils.hashlib_wrapper import md5
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.utils.types import DagRunType
if TYPE_CHECKING:
import pandas as pd
@@ -1312,7 +1313,7 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
task_id: str,
logical_date: datetime | None,
configuration: dict,
- run_after: pendulum.DateTime | None = None,
+ run_after: pendulum.DateTime | datetime | None = None,
force_rerun: bool = False,
) -> str:
if force_rerun:
@@ -1341,18 +1342,14 @@ class BigQueryHook(GoogleBaseHook, DbApiHook):
job_id =
f"airflow_{dag_id}_{task_id}_{job_id_timestamp.isoformat()}_{uniqueness_suffix}"
return re.sub(r"[:\-+.]", "_", job_id)
- def get_run_after_or_logical_date(self, context: Context) ->
pendulum.DateTime:
+ def get_run_after_or_logical_date(self, context: Context) ->
pendulum.DateTime | datetime | None:
+ dag_run = context.get("dag_run")
+ if not dag_run:
+ return pendulum.now("UTC")
+
if AIRFLOW_V_3_0_PLUS:
- if dag_run := context.get("dag_run"):
- run_after = pendulum.instance(dag_run.run_after)
- else:
- run_after = pendulum.now("UTC")
- else:
- if logical_date := context.get("logical_date"):
- run_after = logical_date
- else:
- run_after = pendulum.now("UTC")
- return run_after
+ return dag_run.start_date
+ return dag_run.start_date if dag_run.run_type == DagRunType.SCHEDULED
else context.get("logical_date")
def split_tablename(
self, table_input: str, default_project_id: str, var_name: str | None
= None
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
index 2ad9c025044..e5d899ce96f 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_bigquery.py
@@ -40,6 +40,7 @@ from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.exceptions import NotFound
from airflow.exceptions import AirflowException
+from airflow.models import DagRun
from airflow.providers.common.compat.assets import Asset
from airflow.providers.common.compat.sdk import Context
from airflow.providers.google.cloud.hooks.bigquery import (
@@ -52,6 +53,7 @@ from airflow.providers.google.cloud.hooks.bigquery import (
_validate_src_fmt_configs,
_validate_value,
)
+from airflow.utils.types import DagRunType
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
@@ -684,14 +686,44 @@ class TestBigQueryHookMethods(_BigQueryBaseTestClass):
assert job_id == expected_job_id
def test_get_run_after_or_logical_date(self):
+ """Test get_run_after_or_logical_date for both Airflow 3.x and pre-3.0
behavior."""
if AIRFLOW_V_3_0_PLUS:
- from airflow.models import DagRun
+ ctx = Context(
+ dag_run=DagRun(
+ run_type=DagRunType.MANUAL,
+ start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+ ),
+ logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+ )
+ assert self.hook.get_run_after_or_logical_date(ctx) ==
pendulum.datetime(2025, 2, 2, tz="UTC")
+
+ ctx = Context(
+ dag_run=DagRun(
+ run_type=DagRunType.SCHEDULED,
+ start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+ ),
+ logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+ )
+ assert self.hook.get_run_after_or_logical_date(ctx) ==
pendulum.datetime(2025, 2, 2, tz="UTC")
- ctx = Context(dag_run=DagRun(run_after=pendulum.datetime(2025, 1,
1)))
else:
- ctx = Context(logical_date=pendulum.datetime(2025, 1, 1))
+ ctx = Context(
+ dag_run=DagRun(
+ run_type=DagRunType.MANUAL,
+ start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+ ),
+ logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+ )
+ assert self.hook.get_run_after_or_logical_date(ctx) ==
pendulum.datetime(2025, 1, 1, tz="UTC")
- assert self.hook.get_run_after_or_logical_date(ctx) ==
pendulum.datetime(2025, 1, 1)
+ ctx = Context(
+ dag_run=DagRun(
+ run_type=DagRunType.SCHEDULED,
+ start_date=pendulum.datetime(2025, 2, 2, tz="UTC"),
+ ),
+ logical_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
+ )
+ assert self.hook.get_run_after_or_logical_date(ctx) ==
pendulum.datetime(2025, 2, 2, tz="UTC")
@mock.patch(
"airflow.providers.google.cloud.hooks.bigquery.BigQueryHook.get_job",